{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%reload_ext autoreload\n", "%autoreload 2" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#export\n", "from nb_004b import *\n", "import torchvision.models as tvm" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Dogs and cats" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Basic data aug" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "PATH = Path('data/dogscats')\n", "arch = tvm.resnet34" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#export\n", "def uniform_int(low:Number, high:Number, size:Optional[List[int]]=None)->FloatOrTensor:\n", " \"Generate int or tensor `size` of ints from uniform(`low`,`high`)\"\n", " return random.randint(low,high) if size is None else torch.randint(low,high,size)\n", "\n", "@TfmPixel\n", "def dihedral(x, k:partial(uniform_int,0,8)):\n", " \"Randomly flip `x` image based on k\"\n", " flips=[]\n", " if k&1: flips.append(1)\n", " if k&2: flips.append(2)\n", " if flips: x = torch.flip(x,flips)\n", " if k&4: x = x.transpose(1,2)\n", " return x.contiguous()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#export\n", "def get_transforms(do_flip:bool=True, flip_vert:bool=False, max_rotate:float=10., max_zoom:float=1.1, \n", " max_lighting:float=0.2, max_warp:float=0.2, p_affine:float=0.75, \n", " p_lighting:float=0.75, xtra_tfms:float=None)->Collection[Transform]:\n", " \"Utility func to easily create list of `flip`, `rotate`, `zoom`, `warp`, `lighting` transforms\"\n", " res = [rand_crop()]\n", " if do_flip: res.append(dihedral() if flip_vert else flip_lr(p=0.5))\n", " if max_warp: res.append(symmetric_warp(magnitude=(-max_warp,max_warp), p=p_affine))\n", " if max_rotate: res.append(rotate(degrees=(-max_rotate,max_rotate), p=p_affine))\n", " if max_zoom>1: res.append(rand_zoom(scale=(1.,max_zoom), p=p_affine))\n", " if max_lighting:\n", " res.append(brightness(change=(0.5*(1-max_lighting), 0.5*(1+max_lighting)), p=p_lighting))\n", " res.append(contrast(scale=(1-max_lighting, 1/(1-max_lighting)), p=p_lighting))\n", " # train , valid\n", " return (res + listify(xtra_tfms), [crop_pad()]) \n", "\n", "imagenet_stats = tensor([0.485, 0.456, 0.406]), tensor([0.229, 0.224, 0.225])\n", "imagenet_norm,imagenet_denorm = normalize_funcs(*imagenet_stats)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "size=224\n", "\n", "tfms = get_transforms(do_flip=True, max_rotate=10, max_zoom=1.2, max_lighting=0.3, max_warp=0.15)\n", "data = data_from_imagefolder(PATH, test='test1', bs=64, ds_tfms=tfms,\n", " num_workers=8, tfms=imagenet_norm, size=size)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "(x,y) = next(iter(data.valid_dl))\n", "\n", "_,axs = plt.subplots(4,4,figsize=(12,12))\n", "for i,ax in enumerate(axs.flatten()): show_image(imagenet_denorm(x[i].cpu()), ax)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "(x,y) = next(iter(data.test_dl))\n", "\n", "_,axs = plt.subplots(4,4,figsize=(12,12))\n", "for i,ax in enumerate(axs.flatten()): show_image(imagenet_denorm(x[i].cpu()), ax)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "x=data.valid_ds[2][0]\n", "_,axes = plt.subplots(2,4, figsize=(12,6))\n", "for i,ax in enumerate(axes.flat): dihedral(x,i).show(ax)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## ConvLearner" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#export\n", "def train_epoch(model:Model, dl:DataLoader, opt:optim.Optimizer, loss_func:LossFunction)->None:\n", " \"Simple training of `model` for 1 epoch of `dl` using optim `opt` and loss function `loss_func`\"\n", " model.train()\n", " for xb,yb in dl:\n", " loss = loss_func(model(xb), yb)\n", " loss.backward()\n", " opt.step()\n", " opt.zero_grad()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#export\n", "class AdaptiveConcatPool2d(nn.Module):\n", " \"Layer that concats `AdaptiveAvgPool2d` and `AdaptiveMaxPool2d`\"\n", " def __init__(self, sz:Optional[int]=None):\n", " \"Output will be 2*sz or 2 if sz is None\"\n", " super().__init__()\n", " sz = sz or 1\n", " self.ap,self.mp = nn.AdaptiveAvgPool2d(sz), nn.AdaptiveMaxPool2d(sz)\n", " def forward(self, x): return torch.cat([self.mp(x), self.ap(x)], 1)\n", "\n", "def create_body(model:Model, cut:Optional[int]=None, body_fn:Callable[[Model],Model]=None):\n", " \"Cut off the body of a typically pretrained model at `cut` or as specified by `body_fn`\"\n", " return (nn.Sequential(*list(model.children())[:cut]) if cut\n", " else body_fn(model) if body_fn else model)\n", "\n", "def num_features(m:Model)->int:\n", " \"Return the number of output features for a model\"\n", " for l in reversed(flatten_model(m)):\n", " if hasattr(l, 'num_features'): return l.num_features" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "model = create_body(arch(), -2)\n", "num_features(model)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#export\n", "def bn_drop_lin(n_in:int, n_out:int, bn:bool=True, p:float=0., actn:Optional[nn.Module]=None):\n", " \"`n_in`->bn->dropout->linear(`n_in`,`n_out`)->`actn`\"\n", " layers = [nn.BatchNorm1d(n_in)] if bn else []\n", " if p != 0: layers.append(nn.Dropout(p))\n", " layers.append(nn.Linear(n_in, n_out))\n", " if actn is not None: layers.append(actn)\n", " return layers\n", "\n", "def create_head(nf:int, nc:int, lin_ftrs:Optional[Collection[int]]=None, ps:Floats=0.5):\n", " \"\"\"Model head that takes `nf` features, runs through `lin_ftrs`, and about `nc` classes.\n", " `ps` is for dropout and can be a single float or a list for each layer\"\"\"\n", " lin_ftrs = [nf, 512, nc] if lin_ftrs is None else [nf] + lin_ftrs + [nc]\n", " ps = listify(ps)\n", " if len(ps)==1: ps = [ps[0]/2] * (len(lin_ftrs)-2) + ps\n", " actns = [nn.ReLU(inplace=True)] * (len(lin_ftrs)-2) + [None]\n", " layers = [AdaptiveConcatPool2d(), Flatten()]\n", " for ni,no,p,actn in zip(lin_ftrs[:-1],lin_ftrs[1:],ps,actns): \n", " layers += bn_drop_lin(ni,no,True,p,actn)\n", " return nn.Sequential(*layers)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "create_head(512, 2)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#export\n", "\n", "LayerFunc = Callable[[nn.Module],None]\n", "\n", "def cond_init(m:nn.Module, init_fn:LayerFunc):\n", " \"Initialize the non-batchnorm layers\"\n", " if (not isinstance(m, bn_types)) and requires_grad(m):\n", " if hasattr(m, 'weight'): init_fn(m.weight)\n", " if hasattr(m, 'bias') and hasattr(m.bias, 'data'): m.bias.data.fill_(0.)\n", "\n", "def apply_leaf(m:nn.Module, f:LayerFunc):\n", " \"Apply `f` to children of m\"\n", " c = children(m)\n", " if isinstance(m, nn.Module): f(m)\n", " for l in c: apply_leaf(l,f)\n", "\n", "def apply_init(m, init_fn:LayerFunc): \n", " \"Initialize all non-batchnorm layers of model with `init_fn`\"\n", " apply_leaf(m, partial(cond_init, init_fn=init_fn))\n", "\n", "def _init(learn, init): apply_init(learn.model, init)\n", "Learner.init = _init\n", "\n", "def _default_split(m:Model): \n", " \"By default split models between first and second layer\"\n", " return split_model(m, m[1])\n", "\n", "def _resnet_split(m:Model): \n", " \"Split a resnet style model\"\n", " return split_model(m, (m[0][6],m[1]))\n", "\n", "_default_meta = {'cut':-1, 'split':_default_split}\n", "_resnet_meta = {'cut':-2, 'split':_resnet_split }\n", "\n", "model_meta = {\n", " tvm.resnet18 :{**_resnet_meta}, tvm.resnet34: {**_resnet_meta}, \n", " tvm.resnet50 :{**_resnet_meta}, tvm.resnet101:{**_resnet_meta}, \n", " tvm.resnet152:{**_resnet_meta}}\n", "\n", "class ConvLearner(Learner):\n", " \"Builds convnet style learners\"\n", " def __init__(self, data:DataBunch, arch:Callable, cut=None, pretrained:bool=True, \n", " lin_ftrs:Optional[Collection[int]]=None, ps:Floats=0.5,\n", " custom_head:Optional[nn.Module]=None, split_on:Optional[SplitFuncOrIdxList]=None, **kwargs:Any)->None:\n", " meta = model_meta.get(arch, _default_meta)\n", " torch.backends.cudnn.benchmark = True\n", " body = create_body(arch(pretrained), ifnone(cut,meta['cut']))\n", " nf = num_features(body) * 2\n", " head = custom_head or create_head(nf, data.c, lin_ftrs, ps)\n", " model = nn.Sequential(body, head)\n", " super().__init__(data, model, **kwargs)\n", " self.split(ifnone(split_on,meta['split']))\n", " if pretrained: self.freeze()\n", " apply_init(model[1], nn.init.kaiming_normal_)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Train" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "learn = ConvLearner(data, arch, metrics=accuracy)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "type(arch)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# lr_find(learn)\n", "# learn.recorder.plot()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "learn.fit_one_cycle(4)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "learn.save('0')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Unfreeze" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "learn.load('0')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "learn.unfreeze()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "lr=1e-4" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "learn.fit_one_cycle(6, slice(lr/25,lr), pct_start=0.05)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "learn.save('1')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## TTA" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "learn.load('1')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "model = learn.model" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#export\n", "def pred_batch(learn:Learner, is_valid:bool=True) -> Tuple[Tensors, Tensors, Tensors]:\n", " \"Returns input, target and output of the model on a batch\"\n", " x,y = next(iter(learn.data.valid_dl if is_valid else learn.data.train_dl))\n", " return x,y,learn.model(x).detach()\n", "Learner.pred_batch = pred_batch\n", "\n", "def get_preds(model:Model, dl:DataLoader, pbar:Optional[PBar]=None) -> List[Tensor]:\n", " \"Predicts the output of the elements in the dataloader\"\n", " return [torch.cat(o).cpu() for o in validate(model, dl, pbar=pbar)]\n", "\n", "def _learn_get_preds(learn:Learner, is_test:bool=False) -> List[Tensor]:\n", " \"Wrapper of get_preds for learner\"\n", " return get_preds(learn.model, learn.data.holdout(is_test))\n", "Learner.get_preds = _learn_get_preds\n", "\n", "def show_image_batch(dl:DataLoader, classes:Collection[str], rows:int=None, figsize:Tuple[int,int]=(12,15), \n", " denorm:Callable=None) -> None:\n", " \"Show a few images from a batch\"\n", " x,y = next(iter(dl))\n", " if rows is None: rows = int(math.sqrt(len(x)))\n", " x = x[:rows*rows].cpu()\n", " if denorm: x = denorm(x)\n", " show_images(x,y[:rows*rows].cpu(),rows, classes)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "denormalize" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#export\n", "def _tta_only(learn:Learner, is_test:bool=False, scale:float=1.25) -> Iterator[List[Tensor]]:\n", " \"Computes the outputs for several augmented inputs for TTA\"\n", " dl = learn.data.holdout(is_test)\n", " ds = dl.dataset\n", " old = ds.tfms\n", " augm_tfm = [o for o in learn.data.train_ds.tfms if o.tfm not in\n", " (crop_pad, flip_lr, dihedral, zoom)]\n", " try:\n", " pbar = master_bar(range(8))\n", " for i in pbar:\n", " row = 1 if i&1 else 0\n", " col = 1 if i&2 else 0\n", " flip = i&4\n", " d = {'row_pct':row, 'col_pct':col, 'is_random':False}\n", " tfm = [*augm_tfm, zoom(scale=scale, **d), crop_pad(**d)]\n", " if flip: tfm.append(flip_lr(p=1.))\n", " ds.tfms = tfm\n", " yield get_preds(learn.model, dl, pbar=pbar)[0]\n", " finally: ds.tfms = old\n", " \n", "Learner.tta_only = _tta_only" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "t = list(learn.tta_only(scale=1.35))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "preds,y = get_preds(model, data.valid_dl)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "accuracy(preds, y)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "avg_preds = torch.stack(t).mean(0)\n", "avg_preds.shape, accuracy(avg_preds, y)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "accuracy(preds*0.5 + avg_preds*0.5, y)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "[(beta,accuracy(preds*beta + avg_preds*(1-beta), y)) for beta in np.linspace(0,1,11)]" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#export\n", "def _TTA(learn:Learner, beta:float=0.4, scale:float=1.35, is_test:bool=False) -> Tensors:\n", " preds,y = learn.get_preds(is_test)\n", " all_preds = list(learn.tta_only(scale=scale, is_test=is_test))\n", " avg_preds = torch.stack(all_preds).mean(0)\n", " if beta is None: return preds,avg_preds,y\n", " else: return preds*beta + avg_preds*(1-beta), y\n", "\n", "Learner.TTA = _TTA" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "learn = ConvLearner(data, arch, metrics=accuracy, path=PATH)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "learn.load('1')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "tta_preds = learn.TTA()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "accuracy(*tta_preds)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Fin" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" } }, "nbformat": 4, "nbformat_minor": 2 }