{ "cells": [ { "cell_type": "markdown", "metadata": { "colab_type": "text", "id": "MhoQ0WE77laV" }, "source": [ "##### Copyright 2019 The TensorFlow Authors." ] }, { "cell_type": "code", "execution_count": 0, "metadata": { "cellView": "form", "colab": {}, "colab_type": "code", "id": "_ckMIh7O7s6D" }, "outputs": [], "source": [ "#@title Licensed under the Apache License, Version 2.0 (the \"License\");\n", "# you may not use this file except in compliance with the License.\n", "# You may obtain a copy of the License at\n", "#\n", "# https://www.apache.org/licenses/LICENSE-2.0\n", "#\n", "# Unless required by applicable law or agreed to in writing, software\n", "# distributed under the License is distributed on an \"AS IS\" BASIS,\n", "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n", "# See the License for the specific language governing permissions and\n", "# limitations under the License." ] }, { "cell_type": "markdown", "metadata": { "colab_type": "text", "id": "jYysdyb-CaWM" }, "source": [ "# tf.distribute.Strategy with training loops" ] }, { "cell_type": "markdown", "metadata": { "colab_type": "text", "id": "S5Uhzt6vVIB2" }, "source": [ "\u003ctable class=\"tfo-notebook-buttons\" align=\"left\"\u003e\n", " \u003ctd\u003e\n", " \u003ca target=\"_blank\" href=\"https://www.tensorflow.org/beta/tutorials/distribute/training_loops\"\u003e\u003cimg src=\"https://www.tensorflow.org/images/tf_logo_32px.png\" /\u003eView on TensorFlow.org\u003c/a\u003e\n", " \u003c/td\u003e\n", " \u003ctd\u003e\n", " \u003ca target=\"_blank\" href=\"https://colab.research.google.com/github/tensorflow/docs/blob/master/site/en/r2/tutorials/distribute/training_loops.ipynb\"\u003e\u003cimg src=\"https://www.tensorflow.org/images/colab_logo_32px.png\" /\u003eRun in Google Colab\u003c/a\u003e\n", " \u003c/td\u003e\n", " \u003ctd\u003e\n", " \u003ca target=\"_blank\" href=\"https://github.com/tensorflow/docs/blob/master/site/en/r2/tutorials/distribute/training_loops.ipynb\"\u003e\u003cimg src=\"https://www.tensorflow.org/images/GitHub-Mark-32px.png\" /\u003eView source on GitHub\u003c/a\u003e\n", " \u003c/td\u003e\n", " \u003ctd\u003e\n", " \u003ca href=\"https://storage.googleapis.com/tensorflow_docs/site/en/r2/tutorials/distribute/training_loops.ipynb\"\u003e\u003cimg src=\"https://www.tensorflow.org/images/download_logo_32px.png\" /\u003eDownload notebook\u003c/a\u003e\n", " \u003c/td\u003e\n", "\u003c/table\u003e" ] }, { "cell_type": "markdown", "metadata": { "colab_type": "text", "id": "FbVhjPpzn6BM" }, "source": [ "This tutorial demonstrates how to use [`tf.distribute.Strategy`](https://www.tensorflow.org/guide/distribute_strategy) with custom training loops. We will train a simple CNN model on the fashion MNIST dataset. The fashion MNIST dataset contains 60000 train images of size 28 x 28 and 10000 test images of size 28 x 28.\n", "\n", "We are using custom training loops to train our model because they give us flexibility and a greater control on training. Moreover, it is easier to debug the model and the training loop." ] }, { "cell_type": "code", "execution_count": 0, "metadata": { "colab": {}, "colab_type": "code", "id": "dzLKpmZICaWN" }, "outputs": [], "source": [ "from __future__ import absolute_import, division, print_function, unicode_literals\n", "\n", "# Import TensorFlow\n", "!pip install tensorflow-gpu==2.0.0-beta1\n", "import tensorflow as tf\n", "\n", "# Helper libraries\n", "import numpy as np\n", "import os\n", "\n", "print(tf.__version__)" ] }, { "cell_type": "markdown", "metadata": { "colab_type": "text", "id": "MM6W__qraV55" }, "source": [ "## Download the fashion MNIST dataset" ] }, { "cell_type": "code", "execution_count": 0, "metadata": { "colab": {}, "colab_type": "code", "id": "7MqDQO0KCaWS" }, "outputs": [], "source": [ "fashion_mnist = tf.keras.datasets.fashion_mnist\n", "\n", "(train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data()\n", "\n", "# Adding a dimension to the array -\u003e new shape == (28, 28, 1)\n", "# We are doing this because the first layer in our model is a convolutional\n", "# layer and it requires a 4D input (batch_size, height, width, channels).\n", "# batch_size dimension will be added later on.\n", "train_images = train_images[..., None]\n", "test_images = test_images[..., None]\n", "\n", "# Getting the images in [0, 1] range.\n", "train_images = train_images / np.float32(255)\n", "test_images = test_images / np.float32(255)" ] }, { "cell_type": "markdown", "metadata": { "colab_type": "text", "id": "4AXoHhrsbdF3" }, "source": [ "## Create a strategy to distribute the variables and the graph" ] }, { "cell_type": "markdown", "metadata": { "colab_type": "text", "id": "5mVuLZhbem8d" }, "source": [ "How does `tf.distribute.MirroredStrategy` strategy work?\n", "\n", "* All the variables and the model graph is replicated on the replicas.\n", "* Input is evenly distributed across the replicas.\n", "* Each replica calculates the loss and gradients for the input it received.\n", "* The gradients are synced across all the replicas by summing them.\n", "* After the sync, the same update is made to the copies of the variables on each replica.\n", "\n", "Note: You can put all the code below inside a single scope. We are dividing it into several code cells for illustration purposes.\n" ] }, { "cell_type": "code", "execution_count": 0, "metadata": { "colab": {}, "colab_type": "code", "id": "F2VeZUWUj5S4" }, "outputs": [], "source": [ "# If the list of devices is not specified in the\n", "# `tf.distribute.MirroredStrategy` constructor, it will be auto-detected.\n", "strategy = tf.distribute.MirroredStrategy()" ] }, { "cell_type": "code", "execution_count": 0, "metadata": { "colab": {}, "colab_type": "code", "id": "ZngeM_2o0_JO" }, "outputs": [], "source": [ "print ('Number of devices: {}'.format(strategy.num_replicas_in_sync))" ] }, { "cell_type": "markdown", "metadata": { "colab_type": "text", "id": "k53F5I_IiGyI" }, "source": [ "## Setup input pipeline" ] }, { "cell_type": "markdown", "metadata": { "colab_type": "text", "id": "0Qb6nDgxiN_n" }, "source": [ "Export the graph and the variables to the platform-agnostic SavedModel format. After your model is saved, you can load it with or without the scope." ] }, { "cell_type": "code", "execution_count": 0, "metadata": { "colab": {}, "colab_type": "code", "id": "jwJtsCQhHK-E" }, "outputs": [], "source": [ "BUFFER_SIZE = len(train_images)\n", "\n", "BATCH_SIZE_PER_REPLICA = 64\n", "GLOBAL_BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync\n", "\n", "EPOCHS = 10" ] }, { "cell_type": "markdown", "metadata": { "colab_type": "text", "id": "J7fj3GskHC8g" }, "source": [ "Create the distributed datasets inside a `strategy.scope`:" ] }, { "cell_type": "code", "execution_count": 0, "metadata": { "colab": {}, "colab_type": "code", "id": "WYrMNNDhAvVl" }, "outputs": [], "source": [ "with strategy.scope():\n", "\n", " train_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels)).shuffle(BUFFER_SIZE).batch(GLOBAL_BATCH_SIZE) \n", " train_dist_dataset = strategy.experimental_distribute_dataset(train_dataset)\n", " \n", " test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE) \n", " test_dist_dataset = strategy.experimental_distribute_dataset(test_dataset)" ] }, { "cell_type": "markdown", "metadata": { "colab_type": "text", "id": "bAXAo_wWbWSb" }, "source": [ "## Create the model\n", "\n", "Create a model using `tf.keras.Sequential`. You can also use the Model Subclassing API to do this." ] }, { "cell_type": "code", "execution_count": 0, "metadata": { "colab": {}, "colab_type": "code", "id": "9ODch-OFCaW4" }, "outputs": [], "source": [ "def create_model():\n", " model = tf.keras.Sequential([\n", " tf.keras.layers.Conv2D(32, 3, activation='relu'),\n", " tf.keras.layers.MaxPooling2D(),\n", " tf.keras.layers.Conv2D(64, 3, activation='relu'),\n", " tf.keras.layers.MaxPooling2D(),\n", " tf.keras.layers.Flatten(),\n", " tf.keras.layers.Dense(64, activation='relu'),\n", " tf.keras.layers.Dense(10, activation='softmax')\n", " ])\n", "\n", " return model" ] }, { "cell_type": "code", "execution_count": 0, "metadata": { "colab": {}, "colab_type": "code", "id": "9iagoTBfijUz" }, "outputs": [], "source": [ "# Create a checkpoint directory to store the checkpoints.\n", "checkpoint_dir = './training_checkpoints'\n", "checkpoint_prefix = os.path.join(checkpoint_dir, \"ckpt\")" ] }, { "cell_type": "markdown", "metadata": { "colab_type": "text", "id": "e-wlFFZbP33n" }, "source": [ "## Define the loss function\n", "\n", "Normally, on a single machine with 1 GPU/CPU, loss is divided by the number of examples in the batch of input.\n", "\n", "*So, how should the loss be calculated when using a `tf.distribute.Strategy`?*\n", "\n", "* For an example, let's say you have 4 GPU's and a batch size of 64. One batch of input is distributed\n", "across the replicas (4 GPUs), each replica getting an input of size 16.\n", "\n", "* The model on each replica does a forward pass with its respective input and calculates the loss. Now, instead of dividing the loss by the number of examples in its respective input (BATCH_SIZE_PER_REPLICA = 16), the loss should be divided by the GLOBAL_BATCH_SIZE (64).\n", "\n", "*Why do this?*\n", "\n", "* This needs to be done because after the gradients are calculated on each replica, they are synced across the replicas by **summing** them.\n", "\n", "*How to do this in TensorFlow?*\n", "* If you're writing a custom training loop, as in this tutorial, you should sum the per example losses and divide the sum by the GLOBAL_BATCH_SIZE: \n", "`scale_loss = tf.reduce_sum(loss) * (1. / GLOBAL_BATCH_SIZE)`\n", "or you can use `tf.nn.compute_average_loss` which takes the per example loss,\n", "optional sample weights, and GLOBAL_BATCH_SIZE as arguments and returns the scaled loss.\n", "\n", "* If you are using regularization losses in your model then you need to scale\n", "the loss value by number of replicas. You can do this by using the `tf.nn.scale_regularization_loss` function.\n", "\n", "* Using `tf.reduce_mean` is not recommended. Doing so divides the loss by actual per replica batch size which may vary step to step.\n", "\n", "* This reduction and scaling is done automatically in keras `model.compile` and `model.fit`\n", "\n", "* If using `tf.keras.losses` classes (as in the example below), the loss reduction needs to be explicitly specified to be one of `NONE` or `SUM`. `AUTO` and `SUM_OVER_BATCH_SIZE` are disallowed when used with `tf.distribute.Strategy`. `AUTO` is disallowed because the user should explicitly think about what reduction they want to make sure it is correct in the distributed case. `SUM_OVER_BATCH_SIZE` is disallowed because currently it would only divide by per replica batch size, and leave the dividing by number of replicas to the user, which might be easy to miss. So instead we ask the user do the reduction themselves explicitly." ] }, { "cell_type": "code", "execution_count": 0, "metadata": { "colab": {}, "colab_type": "code", "id": "R144Wci782ix" }, "outputs": [], "source": [ "with strategy.scope():\n", " # Set reduction to `none` so we can do the reduction afterwards and divide by\n", " # global batch size.\n", " loss_object = tf.keras.losses.SparseCategoricalCrossentropy(\n", " reduction=tf.keras.losses.Reduction.NONE)\n", " # or loss_fn = tf.keras.losses.sparse_categorical_crossentropy\n", " def compute_loss(labels, predictions):\n", " per_example_loss = loss_object(labels, predictions)\n", " return tf.nn.compute_average_loss(per_example_loss, global_batch_size=GLOBAL_BATCH_SIZE)" ] }, { "cell_type": "markdown", "metadata": { "colab_type": "text", "id": "w8y54-o9T2Ni" }, "source": [ "## Define the metrics to track loss and accuracy\n", "\n", "These metrics track the test loss and training and test accuracy. You can use `.result()` to get the accumulated statistics at any time." ] }, { "cell_type": "code", "execution_count": 0, "metadata": { "colab": {}, "colab_type": "code", "id": "zt3AHb46Tr3w" }, "outputs": [], "source": [ "with strategy.scope():\n", " test_loss = tf.keras.metrics.Mean(name='test_loss')\n", "\n", " train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(\n", " name='train_accuracy')\n", " test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(\n", " name='test_accuracy')" ] }, { "cell_type": "markdown", "metadata": { "colab_type": "text", "id": "iuKuNXPORfqJ" }, "source": [ "## Training loop" ] }, { "cell_type": "code", "execution_count": 0, "metadata": { "colab": {}, "colab_type": "code", "id": "OrMmakq5EqeQ" }, "outputs": [], "source": [ "# model and optimizer must be created under `strategy.scope`.\n", "with strategy.scope():\n", " model = create_model()\n", "\n", " optimizer = tf.keras.optimizers.Adam()\n", "\n", " checkpoint = tf.train.Checkpoint(optimizer=optimizer, model=model)" ] }, { "cell_type": "code", "execution_count": 0, "metadata": { "colab": {}, "colab_type": "code", "id": "3UX43wUu04EL" }, "outputs": [], "source": [ "with strategy.scope():\n", " def train_step(inputs):\n", " images, labels = inputs\n", "\n", " with tf.GradientTape() as tape:\n", " predictions = model(images, training=True)\n", " loss = compute_loss(labels, predictions)\n", "\n", " gradients = tape.gradient(loss, model.trainable_variables)\n", " optimizer.apply_gradients(zip(gradients, model.trainable_variables))\n", "\n", " train_accuracy.update_state(labels, predictions)\n", " return loss \n", "\n", " def test_step(inputs):\n", " images, labels = inputs\n", "\n", " predictions = model(images, training=False)\n", " t_loss = loss_object(labels, predictions)\n", "\n", " test_loss.update_state(t_loss)\n", " test_accuracy.update_state(labels, predictions)" ] }, { "cell_type": "code", "execution_count": 0, "metadata": { "colab": {}, "colab_type": "code", "id": "gX975dMSNw0e" }, "outputs": [], "source": [ "with strategy.scope():\n", " # `experimental_run_v2` replicates the provided computation and runs it\n", " # with the distributed input.\n", " @tf.function\n", " def distributed_train_step(dataset_inputs):\n", " per_replica_losses = strategy.experimental_run_v2(train_step,\n", " args=(dataset_inputs,))\n", " return strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses,\n", " axis=None)\n", " \n", " @tf.function\n", " def distributed_test_step(dataset_inputs):\n", " return strategy.experimental_run_v2(test_step, args=(dataset_inputs,))\n", "\n", " for epoch in range(EPOCHS):\n", " # TRAIN LOOP\n", " total_loss = 0.0\n", " num_batches = 0\n", " for x in train_dist_dataset:\n", " total_loss += distributed_train_step(x)\n", " num_batches += 1\n", " train_loss = total_loss / num_batches\n", "\n", " # TEST LOOP\n", " for x in test_dist_dataset:\n", " distributed_test_step(x)\n", "\n", " if epoch % 2 == 0:\n", " checkpoint.save(checkpoint_prefix)\n", "\n", " template = (\"Epoch {}, Loss: {}, Accuracy: {}, Test Loss: {}, \"\n", " \"Test Accuracy: {}\")\n", " print (template.format(epoch+1, train_loss,\n", " train_accuracy.result()*100, test_loss.result(),\n", " test_accuracy.result()*100))\n", "\n", " test_loss.reset_states()\n", " train_accuracy.reset_states()\n", " test_accuracy.reset_states()" ] }, { "cell_type": "markdown", "metadata": { "colab_type": "text", "id": "Z1YvXqOpwy08" }, "source": [ "Things to note in the example above:\n", "\n", "* We are iterating over the `train_dist_dataset` and `test_dist_dataset` using a `for x in ...` construct.\n", "* The scaled loss is the return value of the `distributed_train_step`. This value is aggregated across replicas using the `tf.distribute.Strategy.reduce` call and then across batches by summing the return value of the `tf.distribute.Strategy.reduce` calls.\n", "* `tf.keras.Metrics` should be updated inside `train_step` and `test_step` that gets executed by `tf.distribute.Strategy.experimental_run_v2`.\n", "*`tf.distribute.Strategy.experimental_run_v2` returns results from each local replica in the strategy, and there are multiple ways to consume this result. You can do `tf.distribute.Strategy.reduce` to get an aggregated value. You can also do `tf.distribute.Strategy.experimental_local_results` to get the list of values contained in the result, one per local replica.\n" ] }, { "cell_type": "markdown", "metadata": { "colab_type": "text", "id": "-q5qp31IQD8t" }, "source": [ "## Restore the latest checkpoint and test" ] }, { "cell_type": "markdown", "metadata": { "colab_type": "text", "id": "WNW2P00bkMGJ" }, "source": [ "A model checkpointed with a `tf.distribute.Strategy` can be restored with or without a strategy." ] }, { "cell_type": "code", "execution_count": 0, "metadata": { "colab": {}, "colab_type": "code", "id": "pg3B-Cw_cn3a" }, "outputs": [], "source": [ "eval_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(\n", " name='eval_accuracy')\n", "\n", "new_model = create_model()\n", "new_optimizer = tf.keras.optimizers.Adam()\n", "\n", "test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE)" ] }, { "cell_type": "code", "execution_count": 0, "metadata": { "colab": {}, "colab_type": "code", "id": "7qYii7KUYiSM" }, "outputs": [], "source": [ "@tf.function\n", "def eval_step(images, labels):\n", " predictions = new_model(images, training=False)\n", " eval_accuracy(labels, predictions)" ] }, { "cell_type": "code", "execution_count": 0, "metadata": { "colab": {}, "colab_type": "code", "id": "LeZ6eeWRoUNq" }, "outputs": [], "source": [ "checkpoint = tf.train.Checkpoint(optimizer=new_optimizer, model=new_model)\n", "checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir))\n", "\n", "for images, labels in test_dataset:\n", " eval_step(images, labels)\n", "\n", "print ('Accuracy after restoring the saved model without strategy: {}'.format(\n", " eval_accuracy.result()*100))" ] }, { "cell_type": "markdown", "metadata": { "colab_type": "text", "id": "EbcI87EEzhzg" }, "source": [ "## Alternate ways of iterating over a dataset\n", "\n", "### Using iterators\n", "\n", "If you want to iterate over a given number of steps and not through the entire dataset you can create an iterator using the `iter` call and explicity call `next` on the iterator. You can choose to iterate over the dataset both inside and outside the tf.function. Here is a small snippet demonstrating iteration of the dataset outside the tf.function using an iterator.\n" ] }, { "cell_type": "code", "execution_count": 0, "metadata": { "colab": {}, "colab_type": "code", "id": "7c73wGC00CzN" }, "outputs": [], "source": [ "with strategy.scope():\n", " for _ in range(EPOCHS):\n", " total_loss = 0.0\n", " num_batches = 0\n", " train_iter = iter(train_dist_dataset)\n", "\n", " for _ in range(10):\n", " total_loss += distributed_train_step(next(train_iter))\n", " num_batches += 1\n", " average_train_loss = total_loss / num_batches\n", "\n", " template = (\"Epoch {}, Loss: {}, Accuracy: {}\")\n", " print (template.format(epoch+1, average_train_loss, train_accuracy.result()*100))\n", " train_accuracy.reset_states()" ] }, { "cell_type": "markdown", "metadata": { "colab_type": "text", "id": "GxVp48Oy0m6y" }, "source": [ "### Iterating inside a tf.function\n", "You can also iterate over the entire input `train_dist_dataset` inside a tf.function using the `for x in ...` construct or by creating iterators like we did above. The example below demonstrates wrapping one epoch of training in a tf.function and iterating over `train_dist_dataset` inside the function." ] }, { "cell_type": "code", "execution_count": 0, "metadata": { "colab": {}, "colab_type": "code", "id": "-REzmcXv00qm" }, "outputs": [], "source": [ "with strategy.scope():\n", " @tf.function\n", " def distributed_train_epoch(dataset):\n", " total_loss = 0.0\n", " num_batches = 0\n", " for x in dataset:\n", " per_replica_losses = strategy.experimental_run_v2(train_step,\n", " args=(x,))\n", " total_loss += strategy.reduce(\n", " tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)\n", " num_batches += 1\n", " return total_loss / tf.cast(num_batches, dtype=tf.float32)\n", "\n", " for epoch in range(EPOCHS):\n", " train_loss = distributed_train_epoch(train_dist_dataset)\n", "\n", " template = (\"Epoch {}, Loss: {}, Accuracy: {}\")\n", " print (template.format(epoch+1, train_loss, train_accuracy.result()*100))\n", "\n", " train_accuracy.reset_states()" ] }, { "cell_type": "markdown", "metadata": { "colab_type": "text", "id": "MuZGXiyC7ABR" }, "source": [ "### Tracking training loss across replicas\n", "\n", "Note: As a general rule, you should use `tf.keras.Metrics` to track per-sample values and avoid values that have been aggregated within a replica.\n", "\n", "We do *not* recommend using `tf.metrics.Mean` to track the training loss across different replicas, because of the loss scaling computation that is carried out.\n", "\n", "For example, if you run a training job with the following characteristics:\n", "* Two replicas\n", "* Two samples are processed on each replica\n", "* Resulting loss values: [2, 3] and [4, 5] on each replica\n", "* Global batch size = 4\n", "\n", "With loss scaling, you calculate the per-sample value of loss on each replica by adding the loss values, and then dividing by the global batch size. In this case: `(2 + 3) / 4 = 1.25` and `(4 + 5) / 4 = 2.25`. \n", "\n", "If you use `tf.metrics.Mean` to track loss across the two replicas, the result is different. In this example, you end up with a `total` of 3.50 and `count` of 2, which results in `total`/`count` = 1.75 when `result()` is called on the metric. Loss calculated with `tf.keras.Metrics` is scaled by an additional factor that is equal to the number of replicas in sync." ] }, { "cell_type": "markdown", "metadata": { "colab_type": "text", "id": "6hEJNsokjOKs" }, "source": [ "## Next steps\n", "\n", "Try out the new `tf.distribute.Strategy` API on your models." ] } ], "metadata": { "colab": { "collapsed_sections": [], "name": "training_loops.ipynb", "private_outputs": true, "provenance": [], "toc_visible": true, "version": "0.3.2" }, "kernelspec": { "display_name": "Python 3", "name": "python3" } }, "nbformat": 4, "nbformat_minor": 0 }