{ "cells": [ { "cell_type": "markdown", "metadata": { "id": "HBhEFSq8kzjr" }, "source": [ "# Variational autoencoder\n", "\n", "The goal of this exercise is to implement a VAE and apply it on the MNIST dataset. The code is adapted from the keras tutorial:\n", "\n", "" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "5DnEuS6ekzjs" }, "outputs": [], "source": [ "import numpy as np\n", "import matplotlib.pyplot as plt\n", "import tensorflow as tf" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "zJUZl3rFAbtP" }, "outputs": [], "source": [ "# Fetch the MNIST data\n", "(X_train, t_train), (X_test, t_test) = tf.keras.datasets.mnist.load_data()\n", "print(\"Training data:\", X_train.shape, t_train.shape)\n", "print(\"Test data:\", X_test.shape, t_test.shape)\n", "\n", "# Normalize the values\n", "X_train = X_train.reshape(-1, 28, 28, 1).astype('float32') / 255.\n", "X_test = X_test.reshape(-1, 28, 28, 1).astype('float32') / 255.\n", "\n", "# Mean removal\n", "X_mean = np.mean(X_train, axis=0)\n", "X_train -= X_mean\n", "X_test -= X_mean\n", "\n", "# One-hot encoding\n", "T_train = tf.keras.utils.to_categorical(t_train, 10)\n", "T_test = tf.keras.utils.to_categorical(t_test, 10)" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": { "id": "9te9HSqluMVQ" }, "source": [ "As a reminder, a VAE is composed of two parts:\n", "\n", "* The encoder $q_\\varphi(\\mathbf{z} | \\mathbf{x})$ representing the probability distribution $\\mathcal{N}(\\mu_\\mathbf{x}, \\sigma_\\mathbf{x})$ of the latent representation $\\mathbf{z}$.\n", "* The decoder $p_\\theta(\\mathbf{x} | \\mathbf{z})$ reconstructing the input based on a sampled latent representation $\\mathbf{z}$.\n", "\n", "Two fundamental aspects of a VAE are not standard in keras:\n", "\n", "1. The sampling layer $\\mathbf{z} \\sim \\mathcal{N}(\\mu_\\mathbf{x}, \\sigma_\\mathbf{x})$ using the reparameterization trick.\n", "2. The VAE loss:\n", "\n", "$$\n", " \\mathcal{L}(\\theta, \\phi) = \\mathbb{E}_{\\mathbf{x} \\in \\mathcal{D}, \\xi \\sim \\mathcal{N}(0, 1)} [ - \\log p_\\theta(\\mathbf{\\mu_x} + \\mathbf{\\sigma_x} \\, \\xi) + \\dfrac{1}{2} \\, \\sum_{k=1}^K (\\mathbf{\\sigma_x^2} + \\mathbf{\\mu_x}^2 -1 - \\log \\mathbf{\\sigma_x^2})]\n", "$$\n", "\n", "This will force us to dive a bit deeper into the mechanics of tensorflow, but it is not that difficult since the release of tensorflow 2.0 and the eager execution mode." ] }, { "cell_type": "markdown", "metadata": { "id": "eqyKHvU6t2Bw" }, "source": [ "## Gradient tapes: redefining the learning procedure\n", "\n", "Let's first have a look at how to define custom losses. There is an easier way to define custom losses with keras (), but we will need this sightly more complicated variant for the VAE.\n", "\n", "Let's reuse the CNN you implemented last time using the functional API on MNIST, but not compile it yet:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "Wy8ybEYykzju" }, "outputs": [], "source": [ "def create_model():\n", "\n", " inputs = tf.keras.layers.Input((28, 28, 1))\n", "\n", " x = tf.keras.layers.Conv2D(32, (3, 3), activation='relu', padding='valid')(inputs)\n", " x = tf.keras.layers.MaxPooling2D(pool_size=(2, 2))(x)\n", " x = tf.keras.layers.Dropout(0.5)(x)\n", "\n", " x = tf.keras.layers.Conv2D(64, (3, 3), activation='relu', padding='valid')(x)\n", " x = tf.keras.layers.MaxPooling2D(pool_size=(2, 2))(x)\n", " x = tf.keras.layers.Dropout(0.5)(x)\n", "\n", " x = tf.keras.layers.Flatten()(x)\n", "\n", " x = tf.keras.layers.Dense(150, activation='relu')(x)\n", " x = tf.keras.layers.Dropout(0.5)(x)\n", "\n", " outputs = tf.keras.layers.Dense(10, activation='softmax')(x)\n", "\n", " model = tf.keras.Model(inputs, outputs)\n", " print(model.summary())\n", "\n", " return model\n" ] }, { "cell_type": "markdown", "metadata": { "id": "nXfdPvmh55tB" }, "source": [ "In order to have access to the internals of the training procedure, one of the possible methods is to inherit the `tf.keras.Model` class and redefine the `train_step` and (optionally) `test_step` methods.\n", "\n", "The following cell redefines a model for the previous CNN and minimizes the categorical cross-entropy while tracking the loss and accuracy, so it is completely equivalent to:\n", "\n", "```python\n", "model.compile(\n", " loss=\"categorical_crossentropy\", \n", " optimizer=optimizer, \n", " metrics=['accuracy'])\n", "```\n", "\n", "Have a look at the code, but we will go through it step by step afterwards." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "YUtBNjFKsiNo" }, "outputs": [], "source": [ "class CNN(tf.keras.Model):\n", "\n", " def __init__(self):\n", " super(CNN, self).__init__()\n", "\n", " # Model\n", " self.model = create_model()\n", "\n", " # Metrics\n", " self.loss_tracker = tf.keras.metrics.Mean(name=\"loss\")\n", " self.accuracy_tracker = tf.keras.metrics.Accuracy(name=\"accuracy\")\n", "\n", " @property\n", " def metrics(self):\n", " \"Track the loss and accuracy\"\n", " return [self.loss_tracker, self.accuracy_tracker]\n", "\n", " def train_step(self, data):\n", " \n", " # Get the data of the minibatch\n", " X, t = data\n", " \n", " # Use GradientTape to record everything we need to compute the gradient\n", " with tf.GradientTape() as tape:\n", "\n", " # Prediction using the model\n", " y = self.model(X, training=True)\n", " \n", " # Cross-entropy loss\n", " loss = tf.reduce_mean(\n", " tf.reduce_sum(\n", " - t * tf.math.log(y), # Cross-entropy\n", " axis=1 # First index is the batch size, the second is the classes\n", " )\n", " )\n", " \n", " # Compute gradients\n", " grads = tape.gradient(loss, self.trainable_weights)\n", " \n", " # Apply gradients using the optimizer\n", " self.optimizer.apply_gradients(zip(grads, self.trainable_weights))\n", " \n", " # Update metrics \n", " self.loss_tracker.update_state(loss)\n", " true_class = tf.reshape(tf.argmax(t, axis=1), shape=(-1, 1))\n", " predicted_class = tf.reshape(tf.argmax(y, axis=1), shape=(-1, 1))\n", " self.accuracy_tracker.update_state(true_class, predicted_class)\n", " \n", " # Return a dict mapping metric names to current value\n", " return {\"loss\": self.loss_tracker.result(), 'accuracy': self.accuracy_tracker.result()} \n", "\n", " def test_step(self, data):\n", " \n", " # Get data\n", " X, t = data\n", " \n", " # Prediction\n", " y = self.model(X, training=False)\n", " \n", " # Loss\n", " loss = tf.reduce_mean(\n", " tf.reduce_sum(\n", " - t * tf.math.log(y), # Cross-entropy\n", " axis=1\n", " )\n", " )\n", " \n", " # Update metrics \n", " self.loss_tracker.update_state(loss)\n", " true_class = tf.reshape(tf.argmax(t, axis=1), shape=(-1, 1))\n", " predicted_class = tf.reshape(tf.argmax(y, axis=1), shape=(-1, 1))\n", " self.accuracy_tracker.update_state(true_class, predicted_class)\n", " \n", " # Return a dict mapping metric names to current value\n", " return {\"loss\": self.loss_tracker.result(), 'accuracy': self.accuracy_tracker.result()} \n", " " ] }, { "cell_type": "markdown", "metadata": { "id": "jUlGEPbtIAJu" }, "source": [ "The constructor of the new `CNN` class creates the model defined by `create_model()` and stores it as an attribute.\n", "\n", "*Note:* it would be actually more logical to create layers directly here, as we now have a model containing a model, but this is simpler for the VAE architecture.\n", "\n", "The constructor also defines the metrics that should be tracked when training. Here we track the loss and accuracy of the model, using objects of `tf.keras.metrics` (check for a list of metrics you can track). \n", "\n", "The metrics are furthermore declared in the `metrics` property, so that you can now avoid passing `metrics=['accuracy']` to `compile()`. The default `Model` only has `'loss'` as a default metric. \n", "\n", "```python\n", "class CNN(tf.keras.Model):\n", "\n", " def __init__(self):\n", " super(CNN, self).__init__()\n", "\n", " # Model\n", " self.model = create_model()\n", "\n", " # Metrics\n", " self.loss_tracker = tf.keras.metrics.Mean(name=\"loss\")\n", " self.accuracy_tracker = tf.keras.metrics.Accuracy(name=\"accuracy\")\n", "\n", " @property\n", " def metrics(self):\n", " \"Track the loss and accuracy\"\n", " return [self.loss_tracker, self.accuracy_tracker]\n", "```\n", "\n", "The training procedure is defined in the `train_step(data)` method of the class.\n", "\n", "```python\n", " def train_step(self, data):\n", " \n", " # Get the data of the minibatch\n", " X, t = data\n", "```\n", "\n", "`data` is a minibatch of data iteratively passed by `model.fit()`. `X` and `t` are **tensors** (multi-dimensional arrays) representing the inputs and targets. On MNIST, `X` has the shape `(batch_size, 28, 28, 1)` and `t` is `(batch_size, 10)`. The rest of the method defines the loss function, computes its gradient w.r.t the learnable parameters and pass it the optimizer to change their value.\n", "\n", "To get the output of the network on the minibatch, one simply has to call:\n", "\n", "```python\n", "y = self.model(X)\n", "```\n", "\n", "which returns a `(batch_size, 10)` tensor. However, this forward pass does not keep in memory the activity of the hidden layers: all it cares about is the prediction. But when applying backpropagation, you need this internal information to compute the gradient.\n", "\n", "In tensorflow 2.x, you can force the model to record internal activity using the eager execution mode and **gradient tapes** (as in the tape of an audio recorder):\n", "\n", "```python\n", "with tf.GradientTape() as tape:\n", " y = self.model(X, training=True)\n", "```\n", "\n", "It is not a big problem if you are not familiar with Python contexts: all you need to know is that the `tape` object will \"see\" everything that happens when calling `y = self.model(X, training=True)`, i.e. it will record the hidden activations in the model.\n", "\n", "The next thing to do inside the tape is to compute the loss of the model on the minibatch. Here we minimize the categorical cross-entropy:\n", "\n", "$$\\mathcal{L}(\\theta) = \\frac{1}{N} \\, \\sum_{i=1}^N \\sum_{j=1}^C - t^i_j \\, \\log y^i_j$$\n", "\n", "where $N$ is the batch size, $C$ the number of classes, $t^i_j$ the $j$-th element of the $i$-th target vector and $y^i_j$ the predicted probability for class $j$ and the $i$-th sample.\n", "\n", "We therefore need to take our two tensors `t` and `y` and compute that loss function, but recording everything (so inside the tape context).\n", "\n", "There are several ways to do that, for example by calling directly the built-in categorical cross-entropy object of keras on the data:\n", "\n", "```python\n", "loss = tf.keras.losses.CategoricalCrossentropy()(t, y)\n", "```\n", "\n", "Another way to do it is to realize that tensorflow tensors are completely equivalent to numpy arrays: you can apply mathematical operations (sum, element-wise multiplication, log, etc.) on them as if they were regular arrays (internally, that is another story...). \n", "\n", "You can for example add `t` and two times ` y` as they have the same shape:\n", "\n", "```python\n", "loss = t + 2.0 * y\n", "```\n", "\n", "loss would then be a tensor of the same shape. You can get the shape of a tensor with `tf.shape(loss)` just like in numpy.\n", "\n", "Mathematical operation are in the tf.math module (), for example with the log:\n", "\n", "```python\n", "loss = t + tf.math.log(y)\n", "```\n", "\n", "`*` is by default the element-wise multiplication:\n", "\n", "```python\n", "loss = - t * tf.math.log(y)\n", "```\n", "\n", "Here, `loss` is still a `(batch_size, 10)` tensor. We still need to sum over the 10 classes and take the mean over the minibatch to get a single number.\n", "\n", "Summing over the second dimension of this tensor can be done with `tf.reduce_sum`:\n", "\n", "```python\n", "loss = tf.reduce_sum(\n", " - t * tf.math.log(y), \n", " axis=1 # First index is the batch size, the second is the classes\n", ")\n", "```\n", "\n", "This gives us a vector with `batch_size` elements containing the individual losses for the minibatch. In order to compute its mean over the minibatch, we only need to call `tf.reduce_mean()`:\n", "\n", "```python\n", "loss = tf.reduce_mean(\n", " tf.reduce_sum(\n", " - t * tf.math.log(y),\n", " axis=1 \n", " )\n", " )\n", "```\n", "\n", "That's it, we have redefined the categorical cross-entropy loss function on a minibatch using elementary numerical operations! Doing this inside the tape allows tensorflow to keep track of each sample of the minibatch individually: otherwise, it would not know how the loss (a single number) depends on each prediction $y^i$ and therefore on the parameters of the NN.\n", "\n", "Now that we have the loss function as a function of the trainable parameters of the NN on the minibatch, we can ask tensorflow for its gradient:\n", "\n", "\n", "```python\n", "grads = tape.gradient(loss, self.trainable_weights)\n", "```\n", "\n", "Backpropagation is still a one-liner. `self.trainable_weights` contains all weights and biases in the model, while `tape.gradient()` apply backpropagation to compute the gradient of the loss function w.r.t them.\n", "\n", "We can then pass this gradient to the optimizer (SGD or Adam, which will be passed to `compile()`) so that it updates the parameters:\n", "\n", "```python\n", "self.optimizer.apply_gradients(zip(grads, self.trainable_weights))\n", "```\n", "\n", "Finally, we can update our metrics so that our custom loss and the accuracy are tracked during training:\n", "\n", "```python \n", "self.loss_tracker.update_state(loss)\n", "\n", "true_class = tf.reshape(tf.argmax(t, axis=1), shape=(-1, 1))\n", "predicted_class = tf.reshape(tf.argmax(y, axis=1), shape=(-1, 1))\n", "self.accuracy_tracker.update_state(true_class, predicted_class)\n", "```\n", "\n", "For the accuracy, we need to pass the class (predicted or ground truth), not the probabilities.\n", "\n", "The `test_step()` method does roughly the same as `train_step()`, except that it does not modify the parameters: it is called on the validation data in order to compute the metrics. As we do not learn, we do not actually need the tape.\n", "\n", "**Q:** Create the custom CNN model and train it on MNIST. When compiling the model, you only need to pass it the right optimizer, as the loss function and the metrics are already defined in the model. Check that you get the same results as last time." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "-cYxx51P7j_b" }, "outputs": [], "source": [] }, { "cell_type": "markdown", "metadata": { "id": "Fah1owLjEVn1" }, "source": [ "**Q:** Redefine the model so that it minimizes the mean-square error $(t-y)^2$ instead of the cross-entropy. What happens? \n", "\n", "*Hint:* squaring a tensor element-wise is done by applying `**2` on it just like in numpy." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "yq-QlL88kzjv" }, "outputs": [], "source": [] }, { "cell_type": "markdown", "metadata": { "id": "I5kf4T_6EPOK" }, "source": [ "## Custom layers\n", "\n", "Keras layers take a tensor as input (the output of the previous layer on a minibatch) and transform it into another tensor, possibly using trainable parameters. As we have seen, tensorflow allows to manipulate tensors and apply differentiable operations on them, so we could redefine the function made by a keras layer using tensorflow operations.\n", "\n", "The following cell shows how to implement a dummy layer that takes a tensor $T$ as input (the first dimension is the batch size) and returns the tensor $\\exp - \\lambda \\, T$, $\\lambda$ being a fixed parameter. " ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "9JShv9D6C-G3" }, "outputs": [], "source": [ "class ExponentialLayer(tf.keras.layers.Layer):\n", " \"\"\"Layer performing element-wise exponentiation.\"\"\"\n", "\n", " def __init__(self, factor=1.0):\n", " super(ExponentialLayer, self).__init__()\n", " self.factor = factor\n", "\n", " def call(self, inputs):\n", " return tf.exp(- self.factor*inputs)" ] }, { "cell_type": "markdown", "metadata": { "id": "yFS16GuNZy_W" }, "source": [ "`ExponentialLayer` inherits from `tf.keras.layers.Layer` and redefines the `call()` method that defines the forward pass. Here we simply return the corresponding tensor. \n", "\n", "The layer can then be used in a functional model directly:\n", "\n", "```python\n", "x = ExponentialLayer(factor=1.0)(x)\n", "```\n", "\n", "As we use tensorflow operators, it knows how to differentiate it when applying backpropagation. \n", "\n", "More information on how to create new layers can be found at . FYI, this is how you would redefine a fully-connected layer without an activation function, using a trainable weight matrix and bias vector:\n", "\n", "```python\n", "class Linear(tf.keras.layers.Layer):\n", " def __init__(self, units=32):\n", " \"Number of neurons in the layer.\"\n", " super(Linear, self).__init__()\n", " self.units = units\n", "\n", " def build(self, input_shape):\n", " \"Create the weight matrix and bias vector once we know the shape of the previous layer.\"\n", " self.w = self.add_weight(\n", " shape=(input_shape[-1], self.units),\n", " initializer=\"random_normal\",\n", " trainable=True,\n", " )\n", " self.b = self.add_weight(\n", " shape=(self.units,), initializer=\"random_normal\", trainable=True\n", " )\n", "\n", " def call(self, inputs):\n", " \"Return W*X + b\"\n", " return tf.matmul(inputs, self.w) + self.b\n", "```\n", "\n", "**Q:** Add the exponential layer to the CNN between the last FC layer and the output layer. Change the value of the parameter. Does it still work?" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "P1Rde-_NC-I2" }, "outputs": [], "source": [] }, { "cell_type": "markdown", "metadata": { "id": "89Lq_oHoEUkv" }, "source": [ "## Variational autoencoder\n", "\n", "We are now ready to implement the VAE. We are going to redefine the training set, as we want pixel values to be between 0 and 1 (so that we can compute a cross-entropy). Therefore, we do not perform removal:\n", "\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "-zr3uSFHF6AR" }, "outputs": [], "source": [ "# Fetch the MNIST data\n", "(X_train, t_train), (X_test, t_test) = tf.keras.datasets.mnist.load_data()\n", "print(\"Training data:\", X_train.shape, t_train.shape)\n", "print(\"Test data:\", X_test.shape, t_test.shape)\n", "\n", "# Normalize the values\n", "X_train = X_train.reshape(-1, 28, 28, 1).astype('float32') / 255.\n", "X_test = X_test.reshape(-1, 28, 28, 1).astype('float32') / 255.\n", "\n", "# One-hot encoding\n", "T_train = tf.keras.utils.to_categorical(t_train, 10)\n", "T_test = tf.keras.utils.to_categorical(t_test, 10)" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": { "id": "ZS1ygyfyfGdi" }, "source": [ "### Encoder\n", "\n", "The encoder can have any form, the only constraint is that is takes an input $(28, 28, 1)$ and outputs two vectors $\\mu$ and $\\log(\\sigma)$ of size `latent_dim`, the parameters of the normal distribution representing the input. We are going to use only `latent_dim=2` latent dimensions, but let's make the code generic.\n", "\n", "For a network to have two outputs, one just needs to use the functional API to create the graph:\n", "\n", "```python\n", "# Previous layer\n", "x = tf.keras.layers.Dense(N, activation=\"relu\")(x)\n", "\n", "# First output takes input from x\n", "z_mean = tf.keras.layers.Dense(latent_dim)(x)\n", "\n", "# Second output also takes input from x \n", "z_log_var = tf.keras.layers.Dense(latent_dim)(x)\n", "```\n", "\n", "This would not be possible using the Sequential API, but is straightforward using the functional one, as you decide from where a layer takes its inputs.\n", "\n", "What we still need and is not standard in keras is a sampling layer that implements the **reparameterization trick**:\n", "\n", "$$\\mathbf{z} = \\mu + \\sigma \\, \\xi$$\n", "\n", "where $\\xi$ comes from the standard normal distribution $\\mathcal{N}(0, 1)$. \n", "\n", "For technical reasons, it is actually better when `z_log_var` represents $\\log \\sigma^2$ instead of $\\sigma$, as it can take both positive and negative values, while $\\sigma$ could only be strictly positive. \n", "\n", "$$\\text{z\\_log\\_var} = \\log \\, \\sigma^2$$\n", "\n", "$$\\sigma = \\exp \\dfrac{\\text{z\\_log\\_var}}{2}$$\n", "\n", "We therefore want a layer that computes:\n", "\n", "```python\n", "z = z_mean + tf.math.exp(0.5 * z_log_var) * xi\n", "```\n", "\n", "on the tensors of shape `(batch_size, latent_dim)`. To sample the standard normal distribution, you can use tensorflow:\n", "\n", "```python\n", "xi = tf.random.normal(shape=(batch_size, latent_dim) mean=0.0, stddev=1.0)\n", "```\n", "\n", "**Q:** Create a custom `SamplingLayer` layer that takes inputs from `z_mean` and `z_log_var`, being called like this:\n", "\n", "```python\n", "z = SamplingLayer()([z_mean, z_log_var])\n", "```\n", "\n", "In order to get each input separately, the `inputs` argument can be split:\n", "\n", "```python\n", "def call(self, inputs):\n", " z_mean, z_log_var = inputs\n", "```\n", "\n", "The only difficulty is to pass the correct dimensions to `xi`, as you do not know the batch size yet. You can retrieve it using the shape of `z_mean`:\n", "\n", "```python\n", "batch_size = tf.shape(z_mean)[0]\n", "latent_dim = tf.shape(z_mean)[1]\n", "```" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "uAxZRIiikzjw" }, "outputs": [], "source": [] }, { "cell_type": "markdown", "metadata": { "id": "P7pNBm7cnxvE" }, "source": [ "We can now create the encoder in a `create_encoder(latent_dim)` method that return an uncompiled model.\n", "\n", "You can put what you want in the encoder as long as it takes a `(28, 28, 1)` input and returns the three layers `[z_mean, z_log_var, z]` (we need `z_mean` and `z_log_var` to define the loss, normally you only need `z`):\n", "\n", "```python\n", "def create_encoder(latent_dim):\n", "\n", " inputs = tf.keras.layers.Input(shape=(28, 28, 1))\n", " \n", " # Stuff, with x being the last FC layer\n", "\n", " z_mean = tf.keras.layers.Dense(latent_dim)(x)\n", " \n", " z_log_var = tf.keras.layers.Dense(latent_dim)(x)\n", " \n", " z = SamplingLayer()([z_mean, z_log_var])\n", "\n", " model = tf.keras.Model(inputs, [z_mean, z_log_var, z])\n", " \n", " print(model.summary())\n", "\n", " return model\n", "```\n", "\n", "One suggestion would be to use two convolutional layers with a stride of 2 (replacing max-pooling) and one fully-connected layer with enough neurons, but you do what you want.\n", "\n", "**Q:** Create the encoder." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "1Th5tnzykzjw" }, "outputs": [], "source": [] }, { "cell_type": "markdown", "metadata": { "id": "vZ5dp9dGpeFX" }, "source": [ "The decoder is a bit more tricky. It takes the vector `z` as an input (`latent_dim=2` dimensions) and should output an image (28, 28, 1) with pixels normailzed between 0 and 1. The output layer should therefore use the `'sigmoid'` transfer function:\n", "\n", "```python\n", "def create_decoder(latent_dim):\n", " \n", " inputs = tf.keras.layers.Input(shape=(latent_dim,))\n", "\n", " # Stuff, with x being a transposed convolution layer of shape (28, 28, N)\n", " \n", " outputs = tf.keras.layers.Conv2DTranspose(1, (3, 3), activation=\"sigmoid\", padding=\"same\")(x)\n", " \n", " model = tf.keras.Model(inputs, outputs)\n", " print(model.summary())\n", " \n", " return model\n", "```\n", "\n", "The decoder has to use **transposed convolutions** to upsample the tensors instead of downsampling them. Check the doc of Conv2DTranspose at .\n", "\n", "In order to build the decoder, you have to be careful when it comes to tensor shapes: the output must be **exactly** (28, 28, 1), not (26, 26, 1), otherwise you will not be able to compute the reconstruction loss. You need to be careful with the stride (upsampling ratio) and padding method ('same' or 'valid') of the layers you add. Do not hesitate to create dummy models and print their summary to see the shapes. \n", "\n", "Another trick is that you need to transform the vector `z` with `latent_dim=2` elements into a 3D tensor before applying transposed convolutions (i.e. the inverse of `Flatten()`). If you for example want a tensor of shape (7, 7, 64) as the input to the first transposed convolution, you could project the vector to a fully connected layer with `7*7*64` neurons:\n", "\n", "```python\n", "x = tf.keras.layers.Dense(7 * 7 * 64, activation=\"relu\")(inputs)\n", "```\n", "\n", "and reshape it to a (7, 7, 64) tensor:\n", "\n", "```python\n", "x = tf.keras.layers.Reshape((7, 7, 64))(x)\n", "```\n", "\n", "**Q:** Create the decoder. " ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "eo8iGMczkzjx" }, "outputs": [], "source": [] }, { "attachments": {}, "cell_type": "markdown", "metadata": { "id": "wTgM7unqtITF" }, "source": [ "**Q:** Create a custom `VAE` model (inheriting from `tf.keras.Model`) that:\n", "\n", "* takes the latent dimension as argument:\n", "\n", "```python\n", "vae = VAE(latent_dim=2)\n", "```\n", "\n", "* creates the encoder and decoder in the constructor.\n", "\n", "* tracks the reconstruction and KL losses as metrics.\n", "\n", "* does not use validation data (i.e., do not implement `test_step()` and do not provide any validation data to `fit()`).\n", "\n", "* computes the reconstruction loss using binary cross-entropy over all pixels of the reconstructed image:\n", "\n", "$$\\mathcal{L}_\\text{reconstruction}(\\theta) = \\frac{1}{N} \\sum_{i=1}^N \\sum_{w, h \\in \\text{pixels}} - t^i(w, h) \\, \\log y^i(w, h) - (1 - t^i(w, h)) \\, \\log(1 - y^i(w, h))$$\n", "\n", "where $t^i(w, h)$ is the pixel of coordinates $(w, h)$ (between 0 and 27) of the $i$-th image of the minibatch.\n", "\n", "* computes the KL divergence loss for the encoder:\n", "\n", "$$\\mathcal{L}_\\text{KL}(\\theta) = \\frac{1}{2 N} \\sum_{i=1}^N (\\exp(\\text{z\\_log\\_var}^i) + (\\text{z\\_mean}^i)^2 - 1 - \\text{z\\_log\\_var}^i)$$\n", "\n", "* minimizes the total loss:\n", "\n", "$$\\mathcal{L}(\\theta) = \\mathcal{L}_\\text{reconstruction}(\\theta) + \\mathcal{L}_\\text{KL}(\\theta)$$\n", "\n", "Train it on the MNIST images for 30 epochs (or more) with the right batch size and a good optimizer (`history = vae.fit(X_train, X_train, epochs=30, batch_size=b)`). How do the losses evolve?\n", "\n", "*Hint:* for the reconstruction loss, you can implement the formula using tensorflow operations, or call `tf.keras.losses.binary_crossentropy(t, y)` directly.\n", "\n", "Do not worry if your reconstruction loss does not go to zero, but stays in the hundreds, it is normal. Use the next cell to visualize the reconstructions.\n", "\n", "*Note:* The KL is expressed for a single sample as:\n", "\n", "$$\\mathcal{L}_\\text{KL}(\\theta) = \\dfrac{1}{2} \\, (\\mathbf{\\sigma_x^2} + \\mathbf{\\mu_x}^2 - 1 - \\log \\mathbf{\\sigma_x^2})$$\n", "\n", "With $\\text{z\\_log\\_var} = \\log \\, \\sigma^2$ or $\\sigma = \\exp \\dfrac{\\text{z\\_log\\_var}}{2}$, this becomes:\n", "\n", "$$\\mathcal{L}_\\text{KL}(\\theta) = \\dfrac{1}{2} \\, (\\exp \\text{z\\_log\\_var} + \\mathbf{\\mu_x}^2 - 1 - \\text{z\\_log\\_var})$$" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "LAJ8hkAvkzjx" }, "outputs": [], "source": [] }, { "attachments": {}, "cell_type": "markdown", "metadata": { "id": "clEs1g0ouP0D" }, "source": [ "**Q:** The following cell allows to regularly sample the latent space and reconstruct the images. It makes the assumption that the decoder is stored at `vae.decoder`, adapt it otherwise. Comment on the generated samples. Observe in particular the smooth transitions between similar digits." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "RILX_Imckzjz" }, "outputs": [], "source": [ "def plot_latent_space(vae, n=30, figsize=15):\n", " # display a n*n 2D manifold of digits\n", " digit_size = 28\n", " scale = 2.0\n", " figure = np.zeros((digit_size * n, digit_size * n))\n", " # linearly spaced coordinates corresponding to the 2D plot\n", " # of digit classes in the latent space\n", " grid_x = np.linspace(-scale, scale, n)\n", " grid_y = np.linspace(-scale, scale, n)[::-1]\n", "\n", " for i, yi in enumerate(grid_y):\n", " for j, xi in enumerate(grid_x):\n", " z_sample = np.array([[xi, yi]])\n", " x_decoded = vae.decoder.predict(z_sample)\n", " digit = x_decoded[0].reshape(digit_size, digit_size)\n", " figure[\n", " i * digit_size : (i + 1) * digit_size,\n", " j * digit_size : (j + 1) * digit_size,\n", " ] = digit\n", "\n", " plt.figure(figsize=(figsize, figsize))\n", " start_range = digit_size // 2\n", " end_range = n * digit_size + start_range\n", " pixel_range = np.arange(start_range, end_range, digit_size)\n", " sample_range_x = np.round(grid_x, 1)\n", " sample_range_y = np.round(grid_y, 1)\n", " plt.xticks(pixel_range, sample_range_x)\n", " plt.yticks(pixel_range, sample_range_y)\n", " plt.xlabel(\"z[0]\")\n", " plt.ylabel(\"z[1]\")\n", " plt.imshow(figure, cmap=\"Greys_r\")\n", " plt.show()\n", "\n", "\n", "plot_latent_space(vae)" ] }, { "cell_type": "markdown", "metadata": { "id": "uZAf3aIOuwr3" }, "source": [ "**Q:** The following cell visualizes the latent representation for the training data, using different colors for the digits. What do you think?" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "0LsC8Aw8kzj0" }, "outputs": [], "source": [ "def plot_label_clusters(vae, data, labels):\n", " # display a 2D plot of the digit classes in the latent space\n", " z_mean, _, _ = vae.encoder.predict(data)\n", " plt.figure(figsize=(12, 10))\n", " plt.scatter(z_mean[:, 0], z_mean[:, 1], c=labels)\n", " plt.colorbar()\n", " plt.xlabel(\"z[0]\")\n", " plt.ylabel(\"z[1]\")\n", " plt.show()\n", "\n", "\n", "plot_label_clusters(vae, X_train, t_train)" ] } ], "metadata": { "accelerator": "GPU", "colab": { "collapsed_sections": [], "name": "12-VAE.ipynb", "provenance": [] }, "kernelspec": { "display_name": "Python 3.9.13 ('base')", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.13" }, "nikola": { "category": "", "date": "2017-07-21 18:38:07 UTC+10:00", "description": "", "link": "", "slug": "variational-inference-with-implicit-approximate-inference-models-wip-pt-8", "tags": "", "title": "Variational Inference with Implicit Approximate Inference Models (WIP Pt. 8)", "type": "text" }, "vscode": { "interpreter": { "hash": "3d24234067c217f49dc985cbc60012ce72928059d528f330ba9cb23ce737906d" } } }, "nbformat": 4, "nbformat_minor": 0 }