{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Pretrained GAN" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import fastai\n", "from fastai import *\n", "from fastai.vision import *\n", "from fastai.callbacks import *\n", "\n", "from torchvision.models import vgg16_bn" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "path = untar_data(URLs.PETS)\n", "path_hr = path/'images'\n", "path_lr = path/'crappy'" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "torch.cuda.set_device(2)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Critic data" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Prepare the input data by crappifying images." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def crappify(fn,i):\n", " dest = path_lr/fn.relative_to(path_hr)\n", " dest.parent.mkdir(parents=True, exist_ok=True)\n", " img = PIL.Image.open(fn)\n", " targ_sz = resize_to(img, 96, use_min=True)\n", " img = img.resize(targ_sz, resample=PIL.Image.BILINEAR).convert('RGB')\n", " img.save(dest, quality=random.randint(10,70))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Uncomment the first time you run this notebook." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#il = ImageItemList.from_folder(path_hr)\n", "#parallel(crappify, il.items)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "For gradual resizing we can change the commented line here." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#bs,size = 32,128\n", "#bs,size = 8,256\n", "bs,size = 16,160\n", "arch = models.resnet34\n", "classes = ['crappy', 'images']\n", "src = ImageItemList.from_folder(path, include=classes).random_split_by_pct(0.1, seed=42)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "ll = src.label_from_folder(classes=classes)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data_crit = (ll.transform(get_transforms(max_zoom=2.), size=size)\n", " .databunch(bs=bs).normalize(imagenet_stats))\n", "\n", "data_crit.c = 3" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data_crit.show_batch(rows=4, ds_type=DatasetType.Valid)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Train critic" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "conv_args = dict(leaky=0.2, norm_type=NormType.Spectral)\n", "\n", "def conv(ni:int, nf:int, ks:int=3, stride:int=1, **kwargs):\n", " return conv_layer(ni, nf, ks=ks, stride=stride, **conv_args, **kwargs)\n", "\n", "def critic(n_channels:int=3, nf:int=128, n_blocks:int=3, p:int=0.25):\n", " layers = [\n", " conv(n_channels, nf, ks=4, stride=2),\n", " nn.Dropout2d(p/2),\n", " # Removing conv_args because spectral norm makes this slow\n", " res_block(nf, dense=True)] # , **conv_args)]\n", " nf *= 2 # after dense block\n", " for i in range(n_blocks):\n", " layers += [\n", " nn.Dropout2d(p),\n", " conv(nf, nf*2, ks=4, stride=2, self_attention=(i==0))]\n", " nf *= 2\n", " layers += [\n", " conv(nf, 1, ks=4, bias=False, padding=0, use_activ=False),\n", " Flatten()]\n", " return nn.Sequential(*layers)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Module to apply the loss function to every element of the last features before taking the mean." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "class AdaptiveLoss(nn.Module):\n", " def __init__(self, crit):\n", " super().__init__()\n", " self.crit = crit\n", "\n", " def forward(self, output, target):\n", " return self.crit(output, target[:,None].expand_as(output).float())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Specific accuracy metric." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def accuracy_thresh_expand(y_pred:Tensor, y_true:Tensor, thresh:float=0.5, sigmoid:bool=True)->Rank0Tensor:\n", " \"Compute accuracy when `y_pred` and `y_true` are the same size.\"\n", " if sigmoid: y_pred = y_pred.sigmoid()\n", " return ((y_pred>thresh)==y_true[:,None].expand_as(y_pred).byte()).float().mean()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Pretrain the critic on crappy vs not crappy." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "learn_critic = Learner(data_crit, critic(), metrics=accuracy_thresh_expand, loss_func=AdaptiveLoss(nn.BCEWithLogitsLoss()))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "learn_critic.fit_one_cycle(6, 1e-3)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "learn_critic.save('critic-pre2')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Pre-train generator" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now let's pretrain the generator." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "arch = models.resnet34\n", "src = ImageImageList.from_folder(path_lr).random_split_by_pct(0.1, seed=42)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def get_data(bs,size):\n", " data = (src.label_from_func(lambda x: path_hr/x.name)\n", " .transform(get_transforms(max_zoom=2.), size=size, tfm_y=True)\n", " .databunch(bs=bs).normalize(imagenet_stats))\n", "\n", " data.c = 3\n", " return data" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data_gen = get_data(bs,size)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "wd = 1e-3" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "learn_gen = unet_learner(data_gen, arch, wd=wd, blur=True, norm_type=NormType.Spectral, self_attention=True, sigmoid=True,\n", " loss_func=MSELossFlat())" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "learn_gen.fit_one_cycle(2, pct_start=0.8)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "learn_gen.unfreeze()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "learn_gen.fit_one_cycle(2, slice(1e-6,1e-3))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data_gen = get_data(bs,size)\n", "learn_gen.data = data_gen" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "learn_gen.fit_one_cycle(2, slice(1e-6,1e-3))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "learn_gen.show_results(rows=8)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "learn_gen.save('gen-pre2')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## GAN" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now we'll combine those pretrained model in a GAN." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from fastai.vision.gan import *" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Those are the losses from before." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "loss_critic = AdaptiveLoss(nn.BCEWithLogitsLoss())\n", "loss_gen = MSELossFlat()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "learn_crit = Learner(data_crit, critic(), loss_func=loss_critic).load('critic-pre2')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "learn_gen = unet_learner(data_gen, arch, wd=wd, blur=True, norm_type=NormType.Spectral, self_attention=True, sigmoid=True,\n", " loss_func=MSELossFlat()).load('gen-pre2')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "To define a GAN Learner, we just have to specify the learner objects foor the generator and the critic. The switcher is a callback that decides when to switch from discriminator to generator and vice versa. Here we do as many iterations of the discriminator as needed to get its loss back < 0.5 then one iteration of the generator.\n", "\n", "The loss of the critic is given by `learn_crit.loss_func`. We take the average of this loss function on the batch of real predictions (target 1) and the batch of fake predicitions (target 0). \n", "\n", "The loss of the generator is weighted sum (weights in `weights_gen`) of `learn_crit.loss_func` on the batch of fake (passed throught the critic to become predictions) with a target of 1, and the `learn_gen.loss_func` applied to the output (batch of fake) and the target (corresponding batch of superres images)." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@dataclass\n", "class GANDiscriminativeLR(LearnerCallback):\n", " \"`Callback` that handles multiplying the learning rate by `mult_lr` for the critic.\"\n", " mult_lr:float = 1.\n", "\n", " def on_batch_begin(self, train, **kwargs):\n", " \"Multiply the current lr if necessary.\"\n", " if not self.learn.gan_trainer.gen_mode and train:\n", " self.learn.opt.lr *= self.mult_lr\n", "\n", " def on_step_end(self, **kwargs):\n", " \"Put the LR back to its value if necessary.\"\n", " if not self.learn.gan_trainer.gen_mode:\n", " self.learn.opt.lr /= self.mult_lr" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#switcher = partial(AdaptiveGANSwitcher, critic_thresh=0.5)\n", "switcher = partial(FixedGANSwitcher, n_crit=1, n_gen=1)\n", "learn = GANLearner.from_learners(learn_gen, learn_crit, weights_gen=(1.,250.), show_img=False, switcher=switcher,\n", " opt_func=partial(optim.Adam, betas=(0.,0.99)), wd=0)\n", "learn.callback_fns.append(partial(GANDiscriminativeLR, mult_lr=5.))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "learn.fit(20,1e-4)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "learn.fit(20,1e-4)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#Without dense block, adaptive schedule\n", "learn.show_results()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## fin" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" } }, "nbformat": 4, "nbformat_minor": 2 }