{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#default_exp data.load" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#export\n", "from local.torch_basics import *\n", "from local.test import *\n", "\n", "from torch.utils.data.dataloader import _MultiProcessingDataLoaderIter,_SingleProcessDataLoaderIter,_DatasetKind\n", "_loaders = (_MultiProcessingDataLoaderIter,_SingleProcessDataLoaderIter)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from local.notebook.showdoc import *" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "bs = 4\n", "letters = list(string.ascii_lowercase)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## DataLoader" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#export\n", "def _wif(worker_id):\n", " set_num_threads(1)\n", " info = get_worker_info()\n", " ds = info.dataset.d\n", " ds.nw,ds.offs = info.num_workers,info.id\n", " set_seed(info.seed)\n", " ds.wif()\n", "\n", "class _FakeLoader(GetAttr):\n", " _auto_collation,collate_fn,drop_last,dataset_kind,_dataset_kind,_index_sampler = False,noops,False,_DatasetKind.Iterable,_DatasetKind.Iterable,Inf.count\n", " def __init__(self, d, pin_memory, num_workers, timeout):\n", " self.dataset,self.default,self.worker_init_fn = self,d,_wif\n", " store_attr(self, 'd,pin_memory,num_workers,timeout')\n", "\n", " def __iter__(self): return iter(self.d.create_batches(self.d.sample()))\n", "\n", " @property\n", " def multiprocessing_context(self): return (None,multiprocessing)[self.num_workers>0]\n", "\n", " @contextmanager\n", " def no_multiproc(self):\n", " old_nw = self.num_workers\n", " try:\n", " self.num_workers = 0\n", " yield self.d\n", " finally: self.num_workers = old_nw\n", "\n", "_collate_types = (ndarray, Tensor, typing.Mapping, str)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#export\n", "def fa_collate(t):\n", " b = t[0]\n", " return (default_collate(t) if isinstance(b, _collate_types)\n", " else type(t[0])([fa_collate(s) for s in zip(*t)]) if isinstance(b, Sequence)\n", " else default_collate(t))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#e.g. x is int, y is tuple\n", "t = [(1,(2,3)),(1,(2,3))]\n", "test_eq(fa_collate(t), default_collate(t))\n", "test_eq(L(fa_collate(t)).map(type), [Tensor,tuple])\n", "\n", "t = [(1,(2,(3,4))),(1,(2,(3,4)))]\n", "test_eq(fa_collate(t), default_collate(t))\n", "test_eq(L(fa_collate(t)).map(type), [Tensor,tuple])\n", "test_eq(L(fa_collate(t)[1]).map(type), [Tensor,tuple])" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#export\n", "def fa_convert(t):\n", " return (default_convert(t) if isinstance(t, _collate_types)\n", " else type(t)([fa_convert(s) for s in t]) if isinstance(t, Sequence)\n", " else default_convert(t))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "t0 = array([1,2])\n", "t = [t0,(t0,t0)]\n", "\n", "test_eq(fa_convert(t), default_convert(t))\n", "test_eq(L(fa_convert(t)).map(type), [Tensor,tuple])" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#export\n", "class SkipItemException(Exception): pass" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#export\n", "@funcs_kwargs\n", "class DataLoader(GetAttr):\n", " wif=before_iter=after_item=before_batch=after_batch=after_iter = noops\n", " _methods = 'wif before_iter create_batches create_item after_item before_batch create_batch retain after_batch after_iter'.split()\n", " _default = 'dataset'\n", " def __init__(self, dataset=None, bs=None, num_workers=0, pin_memory=False, timeout=0,\n", " shuffle=False, drop_last=False, indexed=None, n=None, **kwargs):\n", " assert not (bs is None and drop_last)\n", " if indexed is None: indexed = dataset is not None and hasattr(dataset,'__getitem__')\n", " if n is None:\n", " try: n = len(dataset)\n", " except TypeError: pass\n", " store_attr(self, 'dataset,bs,shuffle,drop_last,indexed,n,pin_memory,timeout')\n", " self.rng,self.nw,self.offs = random.Random(),1,0\n", " self.fake_l = _FakeLoader(self, pin_memory, num_workers, timeout)\n", "\n", " def __len__(self):\n", " if self.n is None: raise TypeError\n", " if self.bs is None: return self.n\n", " return self.n//self.bs + (0 if self.drop_last or self.n%self.bs==0 else 1)\n", " \n", " def get_idxs(self):\n", " idxs = Inf.count if self.indexed else Inf.nones\n", " if self.n is not None: idxs = list(itertools.islice(idxs, self.n))\n", " if self.shuffle: idxs = self.shuffle_fn(idxs)\n", " return idxs\n", " \n", " def sample(self):\n", " idxs = self.get_idxs()\n", " return (b for i,b in enumerate(idxs) if i//(self.bs or 1)%self.nw==self.offs)\n", " \n", " def __iter__(self):\n", " self.randomize()\n", " self.before_iter()\n", " for b in _loaders[self.fake_l.num_workers==0](self.fake_l): yield self.after_batch(b)\n", " self.after_iter()\n", " if hasattr(self, 'it'): delattr(self, 'it')\n", "\n", " def create_batches(self, samps):\n", " self.it = iter(self.dataset) if self.dataset is not None else None\n", " res = filter(lambda o:o is not None, map(self.do_item, samps))\n", " yield from map(self.do_batch, self.chunkify(res))\n", "\n", " def new(self, dataset=None, cls=None, **kwargs):\n", " if dataset is None: dataset = self.dataset\n", " if cls is None: cls = type(self)\n", " cur_kwargs = dict(dataset=dataset, num_workers=self.fake_l.num_workers, pin_memory=self.pin_memory, timeout=self.timeout,\n", " bs=self.bs, shuffle=self.shuffle, drop_last=self.drop_last, indexed=self.indexed)\n", " for n in self._methods: cur_kwargs[n] = getattr(self, n)\n", " return cls(**merge(cur_kwargs, kwargs))\n", " \n", " @property\n", " def prebatched(self): return self.bs is None\n", " def do_item(self, s):\n", " try: return self.after_item(self.create_item(s))\n", " except SkipItemException: return None\n", " def chunkify(self, b): return b if self.prebatched else chunked(b, self.bs, self.drop_last)\n", " def shuffle_fn(self, idxs): return self.rng.sample(idxs, len(idxs))\n", " def randomize(self): self.rng = random.Random(self.rng.randint(0,2**32-1))\n", " def retain(self, res, b): return retain_types(res, b[0] if is_listy(b) else b)\n", " def create_item(self, s): return next(self.it) if s is None else self.dataset[s]\n", " def create_batch(self, b): return (fa_collate,fa_convert)[self.prebatched](b)\n", " def do_batch(self, b): return self.retain(self.create_batch(self.before_batch(b)), b)\n", " def one_batch(self):\n", " with self.fake_l.no_multiproc(): return first(self)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Override `item` and use the default infinite sampler to get a stream of unknown length (`stop()` when you want to stop the stream)." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "(#119) [0.6499190875330292,0.6920993143601468,0.454128557091131,0.4621040162263076,0.701786166549396,0.09402992729389326,0.7352702711396566,0.5721437125266009,0.639819551789149,0.4898752410585695...]" ] }, "execution_count": null, "metadata": {}, "output_type": "execute_result" } ], "source": [ "class RandDL(DataLoader):\n", " def create_item(self, s):\n", " r = random.random()\n", " return r if r<0.95 else stop()\n", "\n", "L(RandDL())" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "(#1) [4]" ] }, "execution_count": null, "metadata": {}, "output_type": "execute_result" } ], "source": [ "L(RandDL(bs=4, drop_last=True)).map(len)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "(#19) [4,4,4,4,4,4,4,4,4,4...]" ] }, "execution_count": null, "metadata": {}, "output_type": "execute_result" } ], "source": [ "dl = RandDL(bs=4, num_workers=4, drop_last=True)\n", "L(dl).map(len)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "test_eq(dl.fake_l.num_workers, 4)\n", "with dl.fake_l.no_multiproc(): \n", " test_eq(dl.fake_l.num_workers, 0)\n", " L(dl).map(len)\n", "test_eq(dl.fake_l.num_workers, 4)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "(#2) [0.12919553532781858,0.8398403767094285]" ] }, "execution_count": null, "metadata": {}, "output_type": "execute_result" } ], "source": [ "def _rand_item(s):\n", " r = random.random()\n", " return r if r<0.95 else stop()\n", "\n", "L(DataLoader(create_item=_rand_item))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "If you don't set `bs`, then `dataset` is assumed to provide an iterator or a `__getitem__` that returns a batch." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "ds1 = DataLoader(letters)\n", "test_eq(L(ds1), letters)\n", "test_eq(len(ds1), 26)\n", "\n", "test_shuffled(L(DataLoader(letters, shuffle=True)), letters)\n", "\n", "ds1 = DataLoader(letters, indexed=False)\n", "test_eq(L(ds1), letters)\n", "test_eq(len(ds1), 26)\n", "\n", "t2 = L(tensor([0,1,2]),tensor([3,4,5]))\n", "ds2 = DataLoader(t2)\n", "test_eq_type(L(ds2), t2)\n", "\n", "t3 = L(array([0,1,2]),array([3,4,5]))\n", "ds3 = DataLoader(t3)\n", "test_eq_type(L(ds3), t3.map(tensor))\n", "\n", "ds4 = DataLoader(t3, create_batch=noop, after_iter=lambda: setattr(t3, 'f', 1))\n", "test_eq_type(L(ds4), t3)\n", "test_eq(t3.f, 1)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "If you do set `bs`, then `dataset` is assumed to provide an iterator or a `__getitem__` that returns a single item of a batch." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def twoepochs(d): return ' '.join(''.join(o) for _ in range(2) for o in d)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "ds1 = DataLoader(letters, bs=4, drop_last=True, num_workers=0)\n", "test_eq(twoepochs(ds1), 'abcd efgh ijkl mnop qrst uvwx abcd efgh ijkl mnop qrst uvwx')\n", "\n", "ds1 = DataLoader(letters,4,num_workers=2)\n", "test_eq(twoepochs(ds1), 'abcd efgh ijkl mnop qrst uvwx yz abcd efgh ijkl mnop qrst uvwx yz')\n", "\n", "ds1 = DataLoader(range(12), bs=4, num_workers=3)\n", "test_eq_type(L(ds1), L(tensor([0,1,2,3]),tensor([4,5,6,7]),tensor([8,9,10,11])))\n", "\n", "ds1 = DataLoader([str(i) for i in range(11)], bs=4, after_iter=lambda: setattr(t3, 'f', 2))\n", "test_eq_type(L(ds1), L(['0','1','2','3'],['4','5','6','7'],['8','9','10']))\n", "test_eq(t3.f, 2)\n", "\n", "it = iter(DataLoader(map(noop,range(20)), bs=4, num_workers=1))\n", "test_eq_type([next(it) for _ in range(3)], [tensor([0,1,2,3]),tensor([4,5,6,7]),tensor([8,9,10,11])])" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 4.3 ms, sys: 64 µs, total: 4.36 ms\n", "Wall time: 243 ms\n", "CPU times: user 10.4 ms, sys: 13.8 ms, total: 24.2 ms\n", "Wall time: 195 ms\n", "CPU times: user 8.62 ms, sys: 26.4 ms, total: 35 ms\n", "Wall time: 133 ms\n" ] } ], "source": [ "class SleepyDL(list):\n", " def __getitem__(self,i):\n", " time.sleep(random.random()/50)\n", " return super().__getitem__(i)\n", "\n", "t = SleepyDL(letters)\n", "\n", "%time test_eq(DataLoader(t, num_workers=0), letters)\n", "%time test_eq(DataLoader(t, num_workers=2), letters)\n", "%time test_eq(DataLoader(t, num_workers=4), letters)\n", "\n", "dl = DataLoader(t, shuffle=True, num_workers=1)\n", "test_shuffled(L(dl), letters)\n", "test_shuffled(L(dl), L(dl))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 20.6 ms, sys: 17.5 ms, total: 38.1 ms\n", "Wall time: 83.2 ms\n" ] } ], "source": [ "class SleepyQueue():\n", " \"Simulate a queue with varying latency\"\n", " def __init__(self, q): self.q=q\n", " def __iter__(self):\n", " while True:\n", " time.sleep(random.random()/100)\n", " try: yield self.q.get_nowait()\n", " except queues.Empty: return\n", "\n", "q = Queue()\n", "for o in range(30): q.put(o)\n", "it = SleepyQueue(q)\n", "\n", "%time test_shuffled(L(DataLoader(it, num_workers=4)), range(30))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "class A(TensorBase): pass\n", "\n", "for nw in (0,2):\n", " t = A(tensor([1,2]))\n", " dl = DataLoader([t,t,t,t,t,t,t,t], bs=4, num_workers=nw)\n", " b = first(dl)\n", " test_eq(type(b), A)\n", "\n", " t = (A(tensor([1,2])),)\n", " dl = DataLoader([t,t,t,t,t,t,t,t], bs=4, num_workers=nw)\n", " b = first(dl)\n", " test_eq(type(b[0]), A)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "class A(TensorBase): pass\n", "t = A(tensor(1,2))\n", "\n", "tdl = DataLoader([t,t,t,t,t,t,t,t], bs=4, num_workers=2, after_batch=to_device)\n", "b = first(tdl)\n", "test_eq(type(b), A)\n", "\n", "# Unknown attributes are delegated to `dataset`\n", "test_eq(tdl.pop(), tensor(1,2))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Export -" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Converted 00_test.ipynb.\n", "Converted 01_core_foundation.ipynb.\n", "Converted 01a_core_utils.ipynb.\n", "Converted 01b_core_dispatch.ipynb.\n", "Converted 01c_core_transform.ipynb.\n", "Converted 02_core_script.ipynb.\n", "Converted 03_torchcore.ipynb.\n", "Converted 03a_layers.ipynb.\n", "Converted 04_data_load.ipynb.\n", "Converted 05_data_core.ipynb.\n", "Converted 06_data_transforms.ipynb.\n", "Converted 07_data_block.ipynb.\n", "Converted 08_vision_core.ipynb.\n", "Converted 09_vision_augment.ipynb.\n", "Converted 09a_vision_data.ipynb.\n", "Converted 10_pets_tutorial.ipynb.\n", "Converted 11_vision_models_xresnet.ipynb.\n", "Converted 12_optimizer.ipynb.\n", "Converted 13_learner.ipynb.\n", "Converted 13a_metrics.ipynb.\n", "Converted 14_callback_schedule.ipynb.\n", "Converted 14a_callback_data.ipynb.\n", "Converted 15_callback_hook.ipynb.\n", "Converted 15a_vision_models_unet.ipynb.\n", "Converted 16_callback_progress.ipynb.\n", "Converted 17_callback_tracker.ipynb.\n", "Converted 18_callback_fp16.ipynb.\n", "Converted 19_callback_mixup.ipynb.\n", "Converted 20_interpret.ipynb.\n", "Converted 20a_distributed.ipynb.\n", "Converted 21_vision_learner.ipynb.\n", "Converted 22_tutorial_imagenette.ipynb.\n", "Converted 23_tutorial_transfer_learning.ipynb.\n", "Converted 30_text_core.ipynb.\n", "Converted 31_text_data.ipynb.\n", "Converted 32_text_models_awdlstm.ipynb.\n", "Converted 33_text_models_core.ipynb.\n", "Converted 34_callback_rnn.ipynb.\n", "Converted 35_tutorial_wikitext.ipynb.\n", "Converted 36_text_models_qrnn.ipynb.\n", "Converted 37_text_learner.ipynb.\n", "Converted 38_tutorial_ulmfit.ipynb.\n", "Converted 40_tabular_core.ipynb.\n", "Converted 41_tabular_model.ipynb.\n", "Converted 42_tabular_rapids.ipynb.\n", "Converted 50_data_block_examples.ipynb.\n", "Converted 60_medical_imaging.ipynb.\n", "Converted 65_medical_text.ipynb.\n", "Converted 70_callback_wandb.ipynb.\n", "Converted 90_notebook_core.ipynb.\n", "Converted 91_notebook_export.ipynb.\n", "Converted 92_notebook_showdoc.ipynb.\n", "Converted 93_notebook_export2html.ipynb.\n", "Converted 94_notebook_test.ipynb.\n", "Converted 95_index.ipynb.\n", "Converted 96_data_external.ipynb.\n", "Converted 97_utils_test.ipynb.\n", "Converted notebook2jekyll.ipynb.\n" ] } ], "source": [ "#hide\n", "from local.notebook.export import notebook2script\n", "notebook2script(all_fs=True)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" } }, "nbformat": 4, "nbformat_minor": 2 }