{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%reload_ext autoreload\n", "%autoreload 2" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from fastai import *\n", "from fastai.vision import *" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Faces" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Setup" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Download the dataset [here](https://download.pytorch.org/tutorial/faces.zip) from the pytoch tutorial on transforms. Unzip it in the data directory, so that data/faces/ contains the images and the csv file." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "PATH = Path('../../data/faces/')\n", "img_fns = get_image_files(PATH)\n", "len(img_fns)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "poses = pd.read_csv(PATH/'face_landmarks.csv')\n", "poses.head()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "pose_dict = {o[0]:o[1:].astype(np.float32) for o in poses.values}" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Reading the coordinates. We will adopt the pytorch convention in grid_sampler where the coordinates are normalized between -1 and 1. (-1,-1) is the top left corner, (1,1) the bottom right. This function scales or unscales the coordinates to that purpose " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#export\n", "def scale_flow(flow, to_unit=True):\n", " s = tensor([flow.size[0]/2,flow.size[1]/2])[None]\n", " if to_unit: flow.flow = flow.flow/s-1\n", " else: flow.flow = (flow.flow+1)*s\n", " return flow" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "pose_pnts = []\n", "for i, fname in enumerate(img_fns):\n", " size = open_image(fname).size\n", " coords = tensor(pose_dict[fname.name]).view(-1,2)\n", " pose_pnts.append(coords)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "len(pose_pnts)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "pose_pnts[0].shape,pose_pnts[0][0]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's have a look at the data." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def show_pose(img, pnts, ax=None):\n", " if ax is None: _,ax = plt.subplots()\n", " img.show(ax=ax, hide_axis=False)\n", " ax.scatter(pnts[:, 0], pnts[:, 1], s=10, marker='.', c='r')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "img = open_image(img_fns[0])\n", "show_pose(img, pose_pnts[0])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## ImagePoints" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "So when we change the picture, the points must be changed accordingly. Specifically, transforms that need to be applied to the points are the reciprocal functions that those applied to pixel values. To deal with that:\n", "- we use an `affine_inv_mult` function to do the inverse of the affine transforms\n", "- we add an `invert` bool argument (default `False`) to coord transforms to get that inverse operation done\n", "\n", "Additionaly, we use singledispatch to change the implementation of pixel transforms for coords." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#export\n", "def _affine_inv_mult(c, m):\n", " \"Applies the inverse affine transform described in m\"\n", " size = c.flow.size()\n", " h,w = c.size\n", " m[0,1] *= h/w\n", " m[1,0] *= w/h\n", " c.flow = c.flow.view(-1,2)\n", " a = torch.inverse(m[:2,:2].t())\n", " c.flow = torch.mm(c.flow - m[:2,2], a).view(size)\n", " return c" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#export\n", "class ImagePoints1(Image):\n", " \"Support applying transforms to a flow points.\"\n", " def __init__(self, flow:FlowField, scale:bool=True, y_first=True):\n", " \"Create from raw tensor image data `px`.\"\n", " if scale: flow = scale_flow(flow)\n", " if y_first: flow.flow = flow.flow.flip(1)\n", " self._flow = flow\n", " self._affine_mat = None\n", " self.flow_func = []\n", " self.sample_kwargs = {}\n", " self.transformed = False\n", "\n", " def clone(self):\n", " \"Mimic the behavior of torch.clone for `Image` objects.\"\n", " return self.__class__(FlowField(self.size, self.flow.flow.clone()), scale=False, y_first=False)\n", "\n", " @property\n", " def shape(self)->Tuple[int,int,int]: return (1, *self._flow.size)\n", " @property\n", " def size(self)->Tuple[int,int]: return self._flow.size\n", " @size.setter\n", " def size(self, sz:int): self._flow.size=sz\n", " @property\n", " def device(self)->torch.device: return self._flow.flow.device\n", "\n", " def __repr__(self): return f'{self.__class__.__name__} {tuple(self.size)}'\n", " \n", " @property\n", " def flow(self)->FlowField:\n", " \"Access the flow-field grid after applying queued affine and coord transforms.\"\n", " if self._affine_mat is not None:\n", " self._flow = _affine_inv_mult(self._flow, self._affine_mat)\n", " self._affine_mat = None\n", " self.transformed = True\n", " if len(self.flow_func) != 0:\n", " for f in self.flow_func[::-1]: self._flow = f(self._flow)\n", " self.transformed = True\n", " self.flow_func = []\n", " return self._flow\n", " \n", " @flow.setter\n", " def flow(self,v:FlowField): self._flow=v\n", " \n", " def coord(self, func:CoordFunc, *args, **kwargs)->'Image':\n", " \"Put `func` with `args` and `kwargs` in `self.flow_func` for later.\"\n", " if 'invert' in kwargs: kwargs['invert'] = True\n", " else: warn(f\"{func.__name__} isn't implemented for `ImagePoints`.\")\n", " self.flow_func.append(partial(func, *args, **kwargs))\n", " return self\n", "\n", " def lighting(self, func:LightingFunc, *args:Any, **kwargs:Any)->'Image': return self\n", "\n", " def pixel(self, func:PixelFunc, *args, **kwargs)->'Image':\n", " \"Equivalent to `self = func_flow(self)`.\"\n", " self = func(self, *args, **kwargs)\n", " self.transformed=True\n", " return self\n", " \n", " def refresh(self):\n", " return self\n", " \n", " def resize(self, size:Union[int,TensorImageSize]):\n", " \"Resize the image to `size`, size can be a single int.\"\n", " if isinstance(size, int): size=(1, size, size)\n", " self._flow.size = size[1:]\n", " return self\n", " \n", " @property\n", " def data(self)->TensorImage:\n", " \"Return the points associated to this object.\"\n", " flow = self.flow #This updates flow before we test if some transforms happened\n", " if self.transformed:\n", " if 'remove_out' not in self.sample_kwargs or self.sample_kwargs['remove_out']:\n", " flow = _remove_points_out(flow)\n", " self.transformed=False\n", " return flow.flow.flip(1)\n", " \n", " def show(self, ax=None, figsize=(3,3), title:Optional[str]=None, hide_axis:bool=True):\n", " if ax is None: _,ax = plt.subplots(figsize=figsize)\n", " pnt = scale_flow(FlowField(self.size, self.data), to_unit=False).flow.flip(1)\n", " ax.scatter(pnt[:, 0], pnt[:, 1], s=10, marker='.', c='r')\n", " if hide_axis: ax.axis('off')\n", " if title: ax.set_title(title)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#export\n", "def _remove_points_out(flow:FlowField):\n", " pad_mask = (flow.flow[:,0] >= -1) * (flow.flow[:,0] <= 1) * (flow.flow[:,1] >= -1) * (flow.flow[:,1] <= 1)\n", " flow.flow = flow.flow[pad_mask]\n", " return flow" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "img = open_image(img_fns[0])\n", "pnts = ImagePoints1(FlowField(img.size, pose_pnts[0].flip(1)))\n", "img.show(y=pnts)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def test_tfm(x, y, tfms, **kwargs):\n", " tfm_x = apply_tfms(tfms, x, **kwargs)\n", " tfm_y = apply_tfms(tfms, y, do_resolve=False, **kwargs)\n", " return tfm_x, tfm_y" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Pixel tranforms aren't supported for points. The only pixel transforms in fastai are `flip_lr`, `dihedral`, `crop`, `pad` and `crop_pad`. We can implement the first two at an affine or coord level, the only downside is that it will trigger an unnecessary interpolation for the image (if we didn't need one). The last two are a bit more messy because they change the size of the image..." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#export\n", "@TfmAffine\n", "def flip_affine() -> TfmAffine:\n", " return [[-1, 0, 0.],\n", " [0, 1, 0],\n", " [0, 0, 1.]]" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "tfms = [rand_zoom(scale=(1.,1.25)), rotate(degrees=(-30,30)), flip_affine(p=0.5)]\n", "_, axs = plt.subplots(2, 4, figsize=(10,5))\n", "for i, ax in enumerate(axs.flatten()):\n", " tfm_x, tfm_y = test_tfm(img, pnts, tfms)\n", " tfm_x.show(ax=ax, y = tfm_y)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "By default, point outs of the image are removed." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "tfms = [rotate(degrees=-30)]" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "tfm_x,tfm_y = test_tfm(img, pnts, tfms, padding_mode='zeros')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "tfm_x.show(y=tfm_y)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "But you can change this with `remove_out=False`." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "tfm_x,tfm_y = test_tfm(img, pnts, tfms, padding_mode='zeros', remove_out=False)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "tfm_x.show(y=tfm_y)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#export\n", "def _pad_coord(x, row_pad:int, col_pad:int, mode='zeros'):\n", " #TODO: implement other padding modes than zeros?\n", " h,w = x.size\n", " pad = torch.Tensor([w/(w + 2*col_pad), h/(h + 2*row_pad)])\n", " x.flow = FlowField((h+2*row_pad, w+2*col_pad) , x.flow.flow * pad[None])\n", " #x.flow.flow.mul_(pad[None])\n", " #x.size = (h+2*row_pad, w+2*col_pad) \n", " return x" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#export\n", "from functools import singledispatch" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#export\n", "_pad_mode_convert = {'reflection':'reflect', 'zeros':'constant', 'border':'replicate'}\n", "\n", "@partial(TfmPixel, order=-10)\n", "@singledispatch\n", "def pad(x, padding:int, mode='reflection'):\n", " \"Pad `x` with `padding` pixels. `mode` fills in space ('zeros','reflection','border').\"\n", " mode = _pad_mode_convert[mode]\n", " return F.pad(x[None], (padding,)*4, mode=mode)[0]\n", "\n", "@pad.register(ImagePoints1)\n", "def _(x, padding:int, mode='reflection'):\n", " return _pad_coord(x, padding, padding, mode)\n", "\n", "@TfmPixel\n", "@singledispatch\n", "def crop(x, size, row_pct:uniform=0.5, col_pct:uniform=0.5):\n", " \"Crop `x` to `size` pixels. `row_pct`,`col_pct` select focal point of crop.\"\n", " size = listify(size,2)\n", " rows,cols = size\n", " row = int((x.size(1)-rows+1) * row_pct)\n", " col = int((x.size(2)-cols+1) * col_pct)\n", " return x[:, row:row+rows, col:col+cols].contiguous()\n", "\n", "@crop.register(ImagePoints1)\n", "def _(x, size, row_pct=0.5, col_pct=0.5):\n", " h,w = x.size\n", " rows,cols = listify(size, 2)\n", " x.flow.flow.mul_(torch.Tensor([w/cols, h/rows])[None])\n", " row = int((h-rows+1) * row_pct)\n", " col = int((w-cols+1) * col_pct)\n", " x.flow.flow.add_(-1 + torch.Tensor([w/cols-2*col/cols, h/rows-2*row/rows])[None])\n", " x.size = (rows, cols)\n", " return x\n", "\n", "@TfmCrop\n", "@singledispatch\n", "def crop_pad(x, size, padding_mode='reflection',\n", " row_pct:uniform = 0.5, col_pct:uniform = 0.5):\n", " \"Crop and pad tfm - `row_pct`,`col_pct` sets focal point.\"\n", " padding_mode = _pad_mode_convert[padding_mode]\n", " size = listify(size,2)\n", " if x.shape[1:] == size: return x\n", " rows,cols = size\n", " if x.size(1)Tensor:\n", " \"Find 8 coeff mentioned [here](https://web.archive.org/web/20150222120106/xenia.media.mit.edu/~cwren/interpolator/).\"\n", " matrix = []\n", " #The equations we'll need to solve.\n", " for p1, p2 in zip(targ_pts, orig_pts):\n", " matrix.append([p1[0], p1[1], 1, 0, 0, 0, -p2[0]*p1[0], -p2[0]*p1[1]])\n", " matrix.append([0, 0, 0, p1[0], p1[1], 1, -p2[1]*p1[0], -p2[1]*p1[1]])\n", "\n", " A = FloatTensor(matrix)\n", " B = FloatTensor(orig_pts).view(8)\n", " #The 8 scalars we seek are solution of AX = B\n", " return torch.gesv(B,A)[0][:,0]\n", "\n", "def _apply_perspective(coords:FlowField, coeffs:Points)->FlowField:\n", " \"Transform `coords` with `coeffs`.\"\n", " size = coords.flow.size()\n", " #compress all the dims expect the last one ang adds ones, coords become N * 3\n", " coords.flow = coords.flow.view(-1,2)\n", " #Transform the coeffs in a 3*3 matrix with a 1 at the bottom left\n", " coeffs = torch.cat([coeffs, FloatTensor([1])]).view(3,3)\n", " coords.flow = torch.addmm(coeffs[:,2], coords.flow, coeffs[:,:2].t())\n", " coords.flow.mul_(1/coords.flow[:,2].unsqueeze(1))\n", " coords.flow = coords.flow[:,:2].view(size)\n", " return coords\n", "\n", "_orig_pts = [[-1,-1], [-1,1], [1,-1], [1,1]]\n", "\n", "def _perspective_warp(c:FlowField, targ_pts:Points, invert=False):\n", " \"Apply warp to `targ_pts` from `_orig_pts` to `c` `FlowField`.\"\n", " if invert: return _apply_perspective(c, _find_coeffs(targ_pts, _orig_pts))\n", " return _apply_perspective(c, _find_coeffs(_orig_pts, targ_pts))\n", "\n", "@TfmCoord\n", "def perspective_warp(c, magnitude:partial(uniform,size=8)=0, invert=False):\n", " \"Apply warp of `magnitude` to `c`.\"\n", " magnitude = magnitude.view(4,2)\n", " targ_pts = [[x+m for x,m in zip(xs, ms)] for xs, ms in zip(_orig_pts, magnitude)]\n", " return _perspective_warp(c, targ_pts, invert)\n", "\n", "@TfmCoord\n", "def symmetric_warp(c, magnitude:partial(uniform,size=4)=0, invert=False):\n", " \"Apply symmetric warp of `magnitude` to `c`.\"\n", " m = listify(magnitude, 4)\n", " targ_pts = [[-1-m[3],-1-m[1]], [-1-m[2],1+m[1]], [1+m[3],-1-m[0]], [1+m[2],1+m[0]]]\n", " return _perspective_warp(c, targ_pts, invert)\n", "\n", "@TfmCoord\n", "def tilt(c, direction:uniform_int, magnitude:uniform=0, invert=False):\n", " \"Tilt `c` field with random `direction` and `magnitude`.\"\n", " orig_pts = [[-1,-1], [-1,1], [1,-1], [1,1]]\n", " if direction == 0: targ_pts = [[-1,-1], [-1,1], [1,-1-magnitude], [1,1+magnitude]]\n", " elif direction == 1: targ_pts = [[-1,-1-magnitude], [-1,1+magnitude], [1,-1], [1,1]]\n", " elif direction == 2: targ_pts = [[-1,-1], [-1-magnitude,1], [1,-1], [1+magnitude,1]]\n", " elif direction == 3: targ_pts = [[-1-magnitude,-1], [-1,1], [1+magnitude,-1], [1,1]]\n", " coeffs = _find_coeffs(targ_pts, _orig_pts) if invert else _find_coeffs(_orig_pts, targ_pts)\n", " return _apply_perspective(c, coeffs)\n", "\n", "@TfmCoord\n", "def skew(c, direction:uniform_int, magnitude:uniform=0, invert=False):\n", " \"Skew `c` field with random `direction` and `magnitude`.\"\n", " orig_pts = [[-1,-1], [-1,1], [1,-1], [1,1]]\n", " if direction == 0: targ_pts = [[-1-magnitude,-1], [-1,1], [1,-1], [1,1]]\n", " elif direction == 1: targ_pts = [[-1,-1-magnitude], [-1,1], [1,-1], [1,1]]\n", " elif direction == 2: targ_pts = [[-1,-1], [-1-magnitude,1], [1,-1], [1,1]]\n", " elif direction == 3: targ_pts = [[-1,-1], [-1,1+magnitude], [1,-1], [1,1]]\n", " elif direction == 4: targ_pts = [[-1,-1], [-1,1], [1+magnitude,-1], [1,1]]\n", " elif direction == 5: targ_pts = [[-1,-1], [-1,1], [1,-1-magnitude], [1,1]]\n", " elif direction == 6: targ_pts = [[-1,-1], [-1,1], [1,-1], [1+magnitude,1]]\n", " elif direction == 7: targ_pts = [[-1,-1], [-1,1], [1,-1], [1,1+magnitude]]\n", " coeffs = _find_coeffs(targ_pts, _orig_pts) if invert else _find_coeffs(_orig_pts, targ_pts)\n", " return _apply_perspective(c, coeffs)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "tfms = [tilt(direction=(0,3), magnitude=0.4)]" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "tfm_x,tfm_y = test_tfm(img, pnts, tfms, padding_mode='zeros', size=300)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "tfm_x.show(y=tfm_y)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "tfms = [symmetric_warp(magnitude=(-0.4,0.4))]\n", "tfm_x,tfm_y = test_tfm(img, pnts, tfms, padding_mode='zeros')\n", "tfm_x.show(y=tfm_y)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## All at once" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def get_transforms(do_flip:bool=True, flip_vert:bool=False, max_rotate:float=10., max_zoom:float=1.1,\n", " max_lighting:float=0.2, max_warp:float=0.2, p_affine:float=0.75,\n", " p_lighting:float=0.75, xtra_tfms:float=None)->Collection[Transform]:\n", " \"Utility func to easily create a list of flip, rotate, `zoom`, warp, lighting transforms.\"\n", " res = [rand_crop()]\n", " if do_flip: res.append(dihedral() if flip_vert else flip_affine(p=0.5))\n", " if max_warp: res.append(symmetric_warp(magnitude=(-max_warp,max_warp), p=p_affine))\n", " if max_rotate: res.append(rotate(degrees=(-max_rotate,max_rotate), p=p_affine))\n", " if max_zoom>1: res.append(rand_zoom(scale=(1.,max_zoom), p=p_affine))\n", " if max_lighting:\n", " res.append(brightness(change=(0.5*(1-max_lighting), 0.5*(1+max_lighting)), p=p_lighting))\n", " res.append(contrast(scale=(1-max_lighting, 1/(1-max_lighting)), p=p_lighting))\n", " # train , valid\n", " return (res + listify(xtra_tfms), [crop_pad()])" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "tfms = get_transforms()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "_, axs = plt.subplots(2, 4, figsize=(10,5))\n", "for i, ax in enumerate(axs.flatten()):\n", " tfm_x, tfm_y = test_tfm(img, pnts, tfms[0], size=224)\n", " tfm_x.show(ax=ax, y = tfm_y)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" } }, "nbformat": 4, "nbformat_minor": 2 }