{
 "cells": [
  {
   "cell_type": "markdown",
   "id": "f9bf98ec",
   "metadata": {},
   "source": [
    "# Certification of Robustness using Intervals"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "a04e2990",
   "metadata": {},
   "source": [
    "In this notebook we will look at using interval bound propagation (IBP) to assess a neural network's certified robustness. \n",
    "\n",
    "To do so we will use a datapoint's interval, also called box, representation. This captures the minimum and maximum values that a feature can take. \n",
    "\n",
    "Then we propagate the interval through the neural network and, by using interval arithmetic on the neural network components we can determine if a datapoint could have its class changed. \n",
    "\n",
    "The interval domain has the great advantage that it is *fast*. However, this speed comes at the expense of precision - we aggressively over-approximate in the forward pass and so we can often only certify a small subset of the data which is safe. More formally, this technique is sound but incomplete.\n",
    "\n",
    "We can see an example of how imprecision arises by looking at a neural network layer that causes a rotation: "
   ]
  },
  {
   "attachments": {
    "box_domain.png": {
     "image/png": ""
    }
   },
   "cell_type": "markdown",
   "id": "35f7152d",
   "metadata": {},
   "source": [
    "![box_domain.png](attachment:box_domain.png)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "fd0f95db",
   "metadata": {},
   "source": [
    "The exact operation when multiplied with the weight matrix should lead to the rotated rectangle. However, in the interval domain we only consider the maximums and minimums of each feature, thus resulting in the larger red rectangle which contains many excess regions.\n",
    "\n",
    "Let's see how this does in practice!"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "id": "c6a8846d",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Using device  cuda:0\n"
     ]
    }
   ],
   "source": [
    "import torch\n",
    "import torch.optim as optim\n",
    "import numpy as np\n",
    "\n",
    "from torch import nn\n",
    "from sklearn.utils import shuffle\n",
    "\n",
    "from art.estimators.certification import interval\n",
    "from art.utils import load_mnist, preprocess, to_categorical\n",
    "\n",
    "device = torch.device(\"cuda:0\" if torch.cuda.is_available() else \"cpu\")\n",
    "print('Using device ', device)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "id": "00ac4fad",
   "metadata": {},
   "outputs": [],
   "source": [
    "# We make an example pytorch classifier\n",
    "\n",
    "class MNISTModel(torch.nn.Module):\n",
    "    \"\"\"\n",
    "    The base model which we will then convert into one using different abstract domains\n",
    "    \"\"\"\n",
    "\n",
    "    def __init__(self, number_of_classes: int):\n",
    "        super(MNISTModel, self).__init__()\n",
    "\n",
    "        self.device = torch.device(\"cuda:0\" if torch.cuda.is_available() else \"cpu\")\n",
    "\n",
    "        self.conv_1 = torch.nn.Conv2d(in_channels=1,\n",
    "                                      out_channels=32,\n",
    "                                      kernel_size=(3, 3),\n",
    "                                      stride=(1, 1))\n",
    "\n",
    "        self.conv_2 = torch.nn.Conv2d(in_channels=32,\n",
    "                                      out_channels=32,\n",
    "                                      kernel_size=(4, 4),\n",
    "                                      stride=(2, 2))\n",
    "\n",
    "        self.conv_3 = torch.nn.Conv2d(in_channels=32,\n",
    "                                      out_channels=64,\n",
    "                                      kernel_size=(3, 3),\n",
    "                                      stride=(1, 1))\n",
    "\n",
    "        self.conv_4 = torch.nn.Conv2d(in_channels=64,\n",
    "                                      out_channels=64,\n",
    "                                      kernel_size=(4, 4),\n",
    "                                      stride=(2, 2))\n",
    "\n",
    "        self.fc1 = torch.nn.Linear(in_features=1024, out_features=512)\n",
    "        self.fc2 = torch.nn.Linear(in_features=512, out_features=512)\n",
    "        self.fc_out = torch.nn.Linear(in_features=512, out_features=number_of_classes)\n",
    "\n",
    "        self.relu = torch.nn.ReLU()\n",
    "\n",
    "    def forward(self, x: \"torch.Tensor\") -> \"torch.Tensor\":\n",
    "\n",
    "        x = self.relu(self.conv_1(x))\n",
    "        x = self.relu(self.conv_2(x))\n",
    "        x = self.relu(self.conv_3(x))\n",
    "        x = self.relu(self.conv_4(x))\n",
    "\n",
    "        x = torch.flatten(x, 1)\n",
    "\n",
    "        x = self.relu(self.fc1(x))\n",
    "        x = self.relu(self.fc2(x))\n",
    "\n",
    "        return self.fc_out(x)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "id": "b239e7da",
   "metadata": {},
   "outputs": [],
   "source": [
    "model = MNISTModel(number_of_classes=10)\n",
    "model.to(device)\n",
    "opt = optim.Adam(model.parameters(), lr=1e-4)\n",
    "criterion = nn.CrossEntropyLoss()\n",
    "(x_train, y_train), (x_test, y_test), min_, max_ = load_mnist()\n",
    "\n",
    "x_test = np.squeeze(x_test)\n",
    "x_test = np.expand_dims(x_test, axis=1)\n",
    "y_test = np.argmax(y_test, axis=1)\n",
    "\n",
    "x_train = np.squeeze(x_train)\n",
    "x_train = np.expand_dims(x_train, axis=1)\n",
    "y_train = np.argmax(y_train, axis=1)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "id": "bd22bbf7",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "End of epoch 0 loss 0.2816831171512604\n",
      "End of epoch 1 loss 0.07683608680963516\n",
      "End of epoch 2 loss 0.051124896854162216\n",
      "End of epoch 3 loss 0.03650381416082382\n",
      "End of epoch 4 loss 0.027725202962756157\n"
     ]
    }
   ],
   "source": [
    "# train the model normally\n",
    "\n",
    "def standard_train(model, opt, criterion, x, y, bsize=32, epochs=5):\n",
    "    num_of_batches = int(len(x) / bsize)\n",
    "    for epoch in range(epochs):\n",
    "        x, y = shuffle(x, y)\n",
    "        loss_list = []\n",
    "        for bnum in range(num_of_batches):\n",
    "            x_batch = np.copy(x[bnum * bsize:(bnum + 1) * bsize])\n",
    "            y_batch = np.copy(y[bnum * bsize:(bnum + 1) * bsize])\n",
    "\n",
    "            x_batch = torch.from_numpy(x_batch).float().to(device)\n",
    "            y_batch = torch.from_numpy(y_batch).type(torch.LongTensor).to(device)\n",
    "\n",
    "            # zero the parameter gradients\n",
    "            opt.zero_grad()\n",
    "            outputs = model(x_batch)\n",
    "            loss = criterion(outputs, y_batch)\n",
    "            loss_list.append(loss.data.cpu())\n",
    "            loss.backward()\n",
    "            opt.step()\n",
    "        print('End of epoch {} loss {}'.format(epoch, np.mean(loss_list)))\n",
    "    return model\n",
    "\n",
    "model = standard_train(model=model,\n",
    "                       opt=opt,\n",
    "                       criterion=criterion,\n",
    "                       x=x_train,\n",
    "                       y=y_train)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "id": "bb5195fa",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Test acc:  98.72\n"
     ]
    }
   ],
   "source": [
    "# lets now get the predicions for the MNIST test set and see how well our model is doing.\n",
    "with torch.no_grad():\n",
    "    test_preds = model(torch.from_numpy(x_test).float().to(device))\n",
    "\n",
    "test_preds = np.argmax(test_preds.cpu().detach().numpy(), axis=1)\n",
    "print('Test acc: ', np.mean(test_preds == y_test) * 100)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "id": "b14657ad",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "registered <class 'torch.nn.modules.conv.Conv2d'>\n",
      "registered <class 'torch.nn.modules.activation.ReLU'>\n",
      "registered <class 'torch.nn.modules.conv.Conv2d'>\n",
      "registered <class 'torch.nn.modules.activation.ReLU'>\n",
      "registered <class 'torch.nn.modules.conv.Conv2d'>\n",
      "registered <class 'torch.nn.modules.activation.ReLU'>\n",
      "registered <class 'torch.nn.modules.conv.Conv2d'>\n",
      "registered <class 'torch.nn.modules.activation.ReLU'>\n",
      "registered <class 'torch.nn.modules.linear.Linear'>\n",
      "registered <class 'torch.nn.modules.activation.ReLU'>\n",
      "registered <class 'torch.nn.modules.linear.Linear'>\n",
      "registered <class 'torch.nn.modules.activation.ReLU'>\n",
      "registered <class 'torch.nn.modules.linear.Linear'>\n",
      "Inferred reshape on op num 8\n"
     ]
    }
   ],
   "source": [
    "# But how robust are these predictions? \n",
    "# We can now examine this neural network's certified robustness. \n",
    "# We pass it into PyTorchIBPClassifier. We will get a print out showing which \n",
    "# neural network layers have been registered. There will also be a \n",
    "# warning to tell us that PytorchInterval currently infers a reshape when \n",
    "# a neural network goes from using convolutional to dense layers. \n",
    "# This will cover the majority of use cases, however, if not then the \n",
    "# certification layers in art.estimators.certification.interval.interval.py \n",
    "# can be used to directly build a certified model structure.\n",
    "\n",
    "interval_model = interval.PyTorchIBPClassifier(model=model, \n",
    "                                               clip_values=(0, 1), \n",
    "                                               loss=nn.CrossEntropyLoss(), \n",
    "                                               input_shape=(1, 28, 28), \n",
    "                                               nb_classes=10)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "id": "06b46793",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Test acc:  98.72\n",
      "Certified score  0.0\n"
     ]
    }
   ],
   "source": [
    "bound = 0.01\n",
    "num_certified = 0\n",
    "num_correct = 0\n",
    "\n",
    "# Use the test data to check its certified robustness.\n",
    "original_x = np.copy(x_test)\n",
    "\n",
    "# Regular accuracy on normal data\n",
    "with torch.no_grad():\n",
    "    test_preds = model(torch.from_numpy(x_test).float().to(device))\n",
    "\n",
    "test_preds = np.argmax(test_preds.cpu().detach().numpy(), axis=1)\n",
    "print('Test acc: ', np.mean(test_preds == y_test))\n",
    "\n",
    "# Here we will manually convert the data into its interval representation\n",
    "upper_bounds = np.clip(np.expand_dims(x_test, axis=1) + bound, 0, 1)\n",
    "lower_bounds = np.clip(np.expand_dims(x_test, axis=1) - bound, 0, 1)\n",
    "\n",
    "interval_x = np.concatenate([lower_bounds, upper_bounds], axis=1)\n",
    "\n",
    "with torch.no_grad():\n",
    "    interval_preds = interval_model.predict_intervals(x=interval_x,\n",
    "                                                      is_interval=True,\n",
    "                                                      batch_size=32)\n",
    "    cert_results = interval_model.certify(preds=interval_preds, labels=y_test)\n",
    "    print('Certified score ', np.mean(cert_results))"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "4ec41143",
   "metadata": {},
   "source": [
    "We can see that this is very low! Much lower than the certification you could get with using Zonotopes for example (see the deepz notebook for comparison). So why use intervals?\n",
    "\n",
    "- Computationally it is very fast: only x2 overhead compared to normal classification as each datapoint is represented by upper and lower bounds. \n",
    "- By comparison Zonotopes can grow (particularly in terms of memory) hundreds of times larger. \n",
    "- We can improve the performance by orders of magnitude if we combine it with methods like certified adversarial training.\n",
    "\n",
    "None the less, we can use the interval domain to certify for smaller regions of inputs (or also for lower dimensional inputs). Let's now use it to certify against a pixel brightening attack"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "id": "20bd7f5b",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Certified score  0.4766\n"
     ]
    }
   ],
   "source": [
    "bound = 0.005\n",
    "int_x = np.expand_dims(x_test, axis=1)\n",
    "\n",
    "# pixels of a certain brighness can be raised to the maximum value.\n",
    "upper_bounds = np.where(int_x > 1 - bound, 1, int_x)\n",
    "interval_x = np.concatenate([int_x, upper_bounds], axis=1)\n",
    "\n",
    "with torch.no_grad():\n",
    "    interval_preds = interval_model.predict_intervals(x=interval_x,\n",
    "                                                      is_interval=True,\n",
    "                                                      batch_size=32)\n",
    "    cert_results = interval_model.certify(preds=interval_preds, labels=y_test)\n",
    "    print('Certified score ', np.mean(cert_results))"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.8.14"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}