{ "cells": [ { "cell_type": "markdown", "metadata": { "colab_type": "text", "id": "iaACKcYpTKZz" }, "source": [ "# Adversarial Example Generation\n", "\n", "In real world applications of machine learning models an important consideration is robustness. If the model performs well only under limited conditions then it may not be suitable for use in a real world, often noisy environment. One way of determining the robustness of a model is to consider which adversarial attacks can defeat it. In this notebook we will follow the [adversarial example generation](https://pytorch.org/tutorials/beginner/fgsm_tutorial.html) from the PyTorch website and implement it in torchbearer. \n", "\n", "Before continuing it would be a good idea to look over that example and familarise yourself with the content since we won't be convering the motivation and background in as much detail, prefering to look at the implementational details. \n", "\n", "**Note**: The easiest way to use this tutorial is as a colab notebook, which allows you to dive in with no setup. We recommend you enable a free GPU with\n", "\n", "> **Runtime**   →   **Change runtime type**   →   **Hardware Accelerator: GPU**\n", "\n", "## Install Torchbearer\n", "\n", "First we install torchbearer if needed. " ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "0.3.2\n" ] } ], "source": [ "#%%\n", "try:\n", " import torchbearer\n", "except:\n", " !pip install -q torchbearer\n", " import torchbearer\n", " \n", " # If problems arise, try\n", " # pip install git+https://github.com/pytorchbearer/torchbearer\n", " # import torchbearer\n", " \n", "print(torchbearer.__version__)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Model and Data\n", "\n", "First we load the same MNIST model that was used in the PyTorch example or train a model from scratch if the file isn't available. Pretrained weights can be found [here](https://drive.google.com/drive/folders/1fn83DF14tWmit0RTKWRhPq5uVXt73e0h?usp=sharing). We then load the MNIST dataset test set. We also included some commented out code to train a new model if you prefer. " ] }, { "cell_type": "code", "execution_count": 2, "metadata": { "colab": {}, "colab_type": "code", "id": "t-eH5WeuU7v6" }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "File: data/lenet_mnist_model.pth was not found, training a model from scratch instead\n" ] }, { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "b0180d4b20964e5e915ec19d28c9fdd8", "version_major": 2, "version_minor": 0 }, "text/plain": [ "HBox(children=(IntProgress(value=0, description='0/5(t)', max=469), HTML(value='')))" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "\n" ] }, { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "0b66d62c451644358194270cc624fabf", "version_major": 2, "version_minor": 0 }, "text/plain": [ "HBox(children=(IntProgress(value=0, description='1/5(t)', max=469), HTML(value='')))" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "\n" ] }, { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "62744c13a9a445248c36b683635a5591", "version_major": 2, "version_minor": 0 }, "text/plain": [ "HBox(children=(IntProgress(value=0, description='2/5(t)', max=469), HTML(value='')))" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "\n" ] }, { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "619d0441933048dc840986e50cfcb77c", "version_major": 2, "version_minor": 0 }, "text/plain": [ "HBox(children=(IntProgress(value=0, description='3/5(t)', max=469), HTML(value='')))" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "\n" ] }, { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "b2eae388b71440229867d4386195e9ac", "version_major": 2, "version_minor": 0 }, "text/plain": [ "HBox(children=(IntProgress(value=0, description='4/5(t)', max=469), HTML(value='')))" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "\n" ] } ], "source": [ "import torch\n", "import torch.nn as nn\n", "import torch.nn.functional as F\n", "from torchbearer import Trial\n", "\n", "class Net(nn.Module):\n", " def __init__(self):\n", " super(Net, self).__init__()\n", " self.conv1 = nn.Conv2d(1, 10, kernel_size=5)\n", " self.conv2 = nn.Conv2d(10, 20, kernel_size=5)\n", " self.conv2_drop = nn.Dropout2d()\n", " self.fc1 = nn.Linear(320, 50)\n", " self.fc2 = nn.Linear(50, 10)\n", "\n", " def forward(self, x):\n", " x = F.relu(F.max_pool2d(self.conv1(x), 2))\n", " x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))\n", " x = x.view(-1, 320)\n", " x = F.relu(self.fc1(x))\n", " x = F.dropout(x, training=self.training)\n", " x = self.fc2(x)\n", " return F.log_softmax(x, dim=1)\n", "\n", "\n", "import torchvision.datasets as datasets\n", "import torchvision.transforms as transforms\n", "\n", "# MNIST Test dataset and dataloader declaration\n", "test_loader = torch.utils.data.DataLoader(\n", " datasets.MNIST('data', train=False, download=True, transform=transforms.Compose([\n", " transforms.ToTensor(),\n", " ])),\n", " batch_size=1, shuffle=True)\n", "\n", "# Use cuda if possible\n", "device = torch.device(\"cuda\" if torch.cuda.is_available() else \"cpu\")\n", "\n", "# Load the pretrained model\n", "model = Net()\n", "pretrained_model = \"data/lenet_mnist_model.pth\"\n", "\n", "try:\n", " state = torch.load(pretrained_model, map_location='cpu')\n", " model.load_state_dict(state)\n", "except FileNotFoundError:\n", " print('File: ' + pretrained_model + ' was not found, training a model from scratch instead')\n", "\n", " train_loader = torch.utils.data.DataLoader(\n", " datasets.MNIST('data', train=True, download=True, transform=transforms.Compose([\n", " transforms.ToTensor(),\n", " ])),\n", " batch_size=128, shuffle=True)\n", " from torch.optim import Adam\n", " trial = Trial(model, optimizer=Adam(model.parameters(), 3e-4), criterion=nn.CrossEntropyLoss(), metrics=['acc'])\n", " _ = trial.with_train_generator(train_loader).to(device).run(5)" ] }, { "cell_type": "markdown", "metadata": { "colab_type": "text", "id": "s0T1AfbCV73U" }, "source": [ "The Attack\n", "========================================\n", "\n", "The attack used in the PyTorch example is the Fast Gradient Sign Method (FGSM) attack by Goodfellow _et. al._ in [Explaining and Harnessing Adversarial Examples](https://arxiv.org/abs/1412.6572). This attack takes advantage of access to the model gradients by simply performing gradient ascent on the input to maximise the classification loss. \n", "\n", "In implementation, the attack is defined by a funciton of the original image, its gradient and an epsilon which is a learning rate for the attack. " ] }, { "cell_type": "code", "execution_count": 3, "metadata": { "colab": {}, "colab_type": "code", "id": "mtyia_TuXdYo" }, "outputs": [], "source": [ "def fgsm_attack(image, epsilon, data_grad):\n", " # Collect the element-wise sign of the data gradient\n", " sign_data_grad = data_grad.sign()\n", " # Create the perturbed image by adjusting each pixel of the input image\n", " perturbed_image = image + epsilon*sign_data_grad\n", " # Adding clipping to maintain [0,1] range\n", " perturbed_image = torch.clamp(perturbed_image, 0, 1)\n", " # Return the perturbed image\n", " return perturbed_image" ] }, { "cell_type": "markdown", "metadata": { "colab_type": "text", "id": "JbBJWsQwYPnF" }, "source": [ "In torchbearer, we would usually implement something like this in a callback, so that we can create a Trial with an MNIST classifier and take advantage of the training loop and metrics built in. \n", "\n", "So lets think of how we need to perform the attack.\n", "- We need to make sure the input image has gradient which we can set **on_sample**\n", "- Optionally, we can skip images which are already misclassified - which can be done **on_forward**, once we know the predictions\n", "- We need to run the attack after generating the gradients, which can be done **on_backward**\n", "- We would like to do some visualisation, which can be done **on_step_training**, but we will implement this in a separate callback for simplicity. \n", "\n", "No we have a callback structure, we shall implement it, referring to the [testing function](https://pytorch.org/tutorials/beginner/fgsm_tutorial.html#testing-function) in the original PyTorch example. " ] }, { "cell_type": "code", "execution_count": 4, "metadata": { "colab": {}, "colab_type": "code", "id": "oOHLhCExZ2ul" }, "outputs": [], "source": [ "from torchbearer import Callback, state_key\n", "\n", "VIS_IMAGE = state_key('vis_image')\n", "PERTURBED_IMAGE = state_key('perturbed_image')\n", "\n", "class FGSA(Callback):\n", " def __init__(self, epsilon):\n", " self.epsilon = epsilon\n", " self.skip = False\n", "\n", " def on_sample(self, state):\n", " state[torchbearer.X].requires_grad = True\n", "\n", " def on_forward(self, state):\n", " pred, true = state[torchbearer.Y_PRED], state[torchbearer.Y_TRUE]\n", " pred = pred.max(1, keepdim=True)[1]\n", " if pred != true:\n", " # Skip already misclassified example\n", " self.skip = True\n", "\n", " def on_backward(self, state):\n", " if not self.skip:\n", " image = state[torchbearer.X]\n", " image_grad = image.grad.data\n", " perturbed_image = fgsm_attack(image, self.epsilon, image_grad)\n", " state[PERTURBED_IMAGE] = torch.cat((image, perturbed_image))\n", " # We replace the prediction so that we can use accuracy metrics easily\n", " state[torchbearer.Y_PRED] = state[torchbearer.MODEL](perturbed_image)\n", "\n", " def on_step_training(self, state):\n", " # Make sure to reset the skip flag for each image\n", " state[torchbearer.MODEL].zero_grad()\n", " self.skip = False\n", " " ] }, { "cell_type": "markdown", "metadata": { "colab_type": "text", "id": "uwba06KDYGGG" }, "source": [ "Running and Testing the Attack\n", "==========================================\n", "\n", "Before doing any visualisation, lets run the attack and just look at how well the model classifies. We create a trial with a cross entropy loss, our attack as a callback (lets choose a large epsilon so we see a large effect) and accuracy as the only metric. If our attack works we should see a very low accuracy. We'll first run 500 steps without an attack and then 500 steps with an attack and compare this accuracy. " ] }, { "cell_type": "code", "execution_count": 5, "metadata": { "colab": {}, "colab_type": "code", "id": "ycMXboNkvEiF" }, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "046c9eee435b40ec982418a551449b69", "version_major": 2, "version_minor": 0 }, "text/plain": [ "HBox(children=(IntProgress(value=0, description='0/1(t)', max=500), HTML(value='')))" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "\n" ] }, { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "bc8d3eb875a1471ab05551163c64edff", "version_major": 2, "version_minor": 0 }, "text/plain": [ "HBox(children=(IntProgress(value=0, description='0/1(t)', max=500), HTML(value='')))" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "\n" ] } ], "source": [ "trial = Trial(model, criterion=nn.CrossEntropyLoss(), callbacks=[], metrics=['acc']).with_train_generator(test_loader, steps=500).to(device).run(1)\n", "\n", "trial = Trial(model, criterion=nn.CrossEntropyLoss(), callbacks=[FGSA(0.5)], metrics=['acc']).with_train_generator(test_loader, steps=500).to(device).run(1)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "For our pretrained model we get an accuracy of over 90%. After attacking, we see an accuracy of below 10%. Our attack was successful. So what do our adversarial images look like? Lets add a visualisation callback that retrieves the perturbed image that we saved in state earlier. Recall that we added a pair of images under that state key, the original and the perturbed. The outputs below will show a number of these pairs. " ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "8c8659dfd89b42e3a908b5d444568190", "version_major": 2, "version_minor": 0 }, "text/plain": [ "HBox(children=(IntProgress(value=0, description='0/1(t)', max=500), HTML(value='')))" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "image/png": "\n", "text/plain": [ "
" ] }, "metadata": { "needs_background": "light" }, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "\n" ] }, { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "91f1cbb3f45040f9ac118dcde9d36531", "version_major": 2, "version_minor": 0 }, "text/plain": [ "HBox(children=(IntProgress(value=0, description='0/1(t)', max=500), HTML(value='')))" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "image/png": "\n", "text/plain": [ "
" ] }, "metadata": { "needs_background": "light" }, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "\n" ] }, { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "3d211fd609f94445a5f00a507e6d1ee5", "version_major": 2, "version_minor": 0 }, "text/plain": [ "HBox(children=(IntProgress(value=0, description='0/1(t)', max=500), HTML(value='')))" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "image/png": "\n", "text/plain": [ "
" ] }, "metadata": { "needs_background": "light" }, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "\n" ] }, { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "f8eed52d61fd428ca9a438a2f99e7200", "version_major": 2, "version_minor": 0 }, "text/plain": [ "HBox(children=(IntProgress(value=0, description='0/1(t)', max=500), HTML(value='')))" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "image/png": "\n", "text/plain": [ "
" ] }, "metadata": { "needs_background": "light" }, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "\n" ] } ], "source": [ "from torchbearer.callbacks import once, on_step_training\n", "from torchbearer.callbacks.imaging import MakeGrid\n", "%matplotlib inline\n", "\n", "vis = lambda x: MakeGrid(PERTURBED_IMAGE).to_pyplot().to_file('perturbed_' + str(x) + '.png').on_train()\n", "trial = Trial(model, criterion=nn.CrossEntropyLoss(), callbacks=[FGSA(0.0), vis(0.0)], metrics=['acc']).with_train_generator(test_loader, steps=500).to(device).run(1)\n", "trial = Trial(model, criterion=nn.CrossEntropyLoss(), callbacks=[FGSA(0.1), vis(0.1)], metrics=['acc']).with_train_generator(test_loader, steps=500).to(device).run(1)\n", "trial = Trial(model, criterion=nn.CrossEntropyLoss(), callbacks=[FGSA(0.2), vis(0.2)], metrics=['acc']).with_train_generator(test_loader, steps=500).to(device).run(1)\n", "trial = Trial(model, criterion=nn.CrossEntropyLoss(), callbacks=[FGSA(0.3), vis(0.3)], metrics=['acc']).with_train_generator(test_loader, steps=500).to(device).run(1)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The PyTorch example goes on to draw a graph of accuracy for various epsilons. We shall quickly show how we can achieve a similar thing with torchbearer. All we need to do is create a new trial on each loop iteration and grab the accuracies out of the history. We use the same plotting code as the PyTorch example. " ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "data": { "image/png": "\n", "text/plain": [ "
" ] }, "metadata": { "needs_background": "light" }, "output_type": "display_data" } ], "source": [ "import matplotlib.pyplot as plt\n", "import numpy as np\n", "\n", "epsilons = [0, .05, .1, .15, .2, .25, .3]\n", "accuracies = []\n", "\n", "for eps in epsilons:\n", " history = Trial(model, criterion=nn.CrossEntropyLoss(), callbacks=[FGSA(eps)], metrics=['acc'], verbose=0).with_train_generator(test_loader).to(device).run(1)\n", " accuracies.append(history[0]['acc'])\n", " \n", "plt.figure(figsize=(5,5))\n", "plt.plot(epsilons, accuracies, \"*-\")\n", "plt.yticks(np.arange(0, 1.1, step=0.1))\n", "plt.xticks(np.arange(0, .35, step=0.05))\n", "plt.title(\"Accuracy vs Epsilon\")\n", "plt.xlabel(\"Epsilon\")\n", "plt.ylabel(\"Accuracy\")\n", "plt.show()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "accelerator": "GPU", "colab": { "name": "Untitled1.ipynb", "provenance": [], "version": "0.3.2" }, "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.8" } }, "nbformat": 4, "nbformat_minor": 1 }