{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "## reading gadget files with dask natively in *yt*\n", "\n", "In this notebook, we show a simple application of the dask chunking experiments for particle reading within *yt* itself. \n", "\n", "Several changes are necessary for this to work:\n", "1. we need to modify how the `BaseIOHandler` reads in particle data from files. \n", "2. modify the `ParticleContainer` so that our chunks can be effectively pickled\n", "3. use *yt*'s on-disk parameter storage option\n", "\n", "\n", "### 1. modifying the `BaseIOHandler` \n", "\n", "One way to use `dask` effectively is to send stack each chunk's read in a delayed dataframe. In this approach, we have a delayed dataframe for each particle type we're reading where each dataframe is partitioned by `dask` over the chunks we supply. Dask delayed dataframes don't require the size a priori, which simplifies building the dataframe but we do need to supply a metadata dictionary to designate the expected datatype for each column. The modified `_read_particle_selection` is not yet on a committed branch, so here's the function: \n", "\n", "```python\n", " def _read_particle_selection(self, chunks, selector, fields):\n", " rv = {}\n", " ind = {}\n", " # We first need a set of masks for each particle type\n", " ptf = defaultdict(list) # ptype -> on-disk fields to read\n", " fsize = defaultdict(lambda: 0) # ptype -> size of return value\n", " psize = defaultdict(lambda: 0) # ptype -> particle count on disk\n", " field_maps = defaultdict(list) # ptype -> fields (including unions)\n", " chunks = list(chunks)\n", " unions = self.ds.particle_unions\n", " # What we need is a mapping from particle types to return types\n", " for field in fields:\n", " ftype, fname = field\n", " fsize[field] = 0\n", " # We should add a check for p.fparticle_unions or something here\n", " if ftype in unions:\n", " for pt in unions[ftype]:\n", " ptf[pt].append(fname)\n", " field_maps[pt, fname].append(field)\n", " else:\n", " ptf[ftype].append(fname)\n", " field_maps[field].append(field)\n", " # Now we have our full listing\n", "\n", " # an experimental dask approach\n", "\n", " # build the meta dictionary for each chunk's dataframe so that empty chunks\n", " # don't cause problems.\n", " ptype_meta = {}\n", " for ptype, flds in ptf.items():\n", " meta_dict = {}\n", " for fld in flds:\n", " meta_dict[fld] = pd.Series([], dtype=np.float64)\n", " ptype_meta[ptype] = meta_dict\n", "\n", " # build the delayed chunk reader (still has parallel issues...)\n", " # particle_field_args = self._read_particle_field_args()\n", " ptypes = list(ptf.keys())\n", " delayed_dfs = {}\n", " for ptype in ptypes:\n", " # build a dataframe from delayed for each particle type\n", " this_ptf = {ptype: ptf[ptype]}\n", " delayed_chunks = [\n", " # all these things need to be pickleable...\n", " dask.delayed(self._read_single_ptype)(\n", " ch, this_ptf, selector, ptype_meta[ptype]\n", " )\n", " for ch in chunks\n", " ]\n", " delayed_dfs[ptype] = ddf.from_delayed(delayed_chunks, meta=ptype_meta[ptype])\n", "\n", " # up to here, everything is delayed w dask, need to read into into memory across\n", " # chunks to return rv: \n", " rv = {}\n", " for ptype in ptypes:\n", " for col in delayed_dfs[ptype].columns:\n", " rv[(ptype, col)] = delayed_dfs[ptype][col].values.compute()\n", "\n", " # but if we returned the delayed dataframes, could do things like this:\n", " # delayed_dfs['PartType0'].Density.mean().compute()\n", "\n", " return rv\n", "\n", " def _read_single_ptype(self, chunk, ptf, selector, meta_dict):\n", " # read a single chunk and single particle type into a pandas dataframe so that \n", " # we can use dask.dataframe.from_delayed! fields within a particle type should\n", " # have the same length?\n", "\n", " chunk_results = pd.DataFrame(meta_dict)\n", " # each particle type could be a different dataframe...\n", " for field_r, vals in self._read_particle_fields([chunk], ptf, selector):\n", " chunk_results[field_r[1]] = vals\n", " return chunk_results\n", "```\n", "\n", "### 2. modifying the `ParticleContainer` \n", "\n", "A single chunk is composed of various particle continers, and there are several modifications we need to make to the `ParticleContainer` related to pickleability. So first off, the `ParticleContainer` on *yt*'s master branch currently cannot be pickled as it inherits a `__reduce__` method from the `Dataset` class that requires the class to be a registered method, which the `ParticleContainer` is not. The `particleIndexRefactor` branch at github.com/chrishavlin/yt does just that by overriding the `__reduce__` method in `ParticleContainer`. \n", "\n", "The other important bit in the `particleIndexRefactor` branch is that it adds an explicit `base_selector` argument to `__init__` rather than pulling out the selector object from the `base_region` argument. The reason for this is that when the arguments are unpickled, the `base_region` object will not have an initialized index. And so when `base_region.selector` is accessed, it first triggers an index-read. Reading the index for every dask process is very slow. But since the majority of the selector objects can now be pickled directly, by adding it as an argument we can avoid that index read. \n", "\n", "\n", "### 3. on-disk parameter storage option\n", "\n", "When *yt* pickles a `Dataset` object, it uses a `__reduce__` method that stores the object arguments used to build the `Dataset` and then re-initializes the dataset on unpickling. Those arguments are kept in the `ParameterStore` object, which under normal operation is an in-memory cache of parameters. Since the parameter store is in memory, it's not available to the separate dask tasks. But there is a *yt* config option that stores these parameters in an on-disk csv instead. Setting `StoreParameterFiles = True` in the yt config will allow the separate dask tasks to read the paramter file and reconstitute the dataset as needed. \n", "\n", "## Single Processor Read \n", "\n", "Ok, so first let's use a single processor, in which case nothing changes in terms of user experience:" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "import yt " ] }, { "cell_type": "code", "execution_count": 28, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "yt : [WARNING ] 2020-11-06 15:09:12,944 tqdm is not installed, progress bar can not be displayed.\n", "yt : [INFO ] 2020-11-06 15:09:13,303 Files located at /home/chris/hdd/data/yt_data/yt_sample_sets/snapshot_033.tar.gz.untar/snapshot_033/snap_033.\n", "yt : [INFO ] 2020-11-06 15:09:13,304 Default to loading snap_033.0.hdf5 for snapshot_033 dataset\n", "yt : [INFO ] 2020-11-06 15:09:13,397 Parameters: current_time = 4.343952725460923e+17 s\n", "yt : [INFO ] 2020-11-06 15:09:13,399 Parameters: domain_dimensions = [1 1 1]\n", "yt : [INFO ] 2020-11-06 15:09:13,400 Parameters: domain_left_edge = [0. 0. 0.]\n", "yt : [INFO ] 2020-11-06 15:09:13,400 Parameters: domain_right_edge = [25. 25. 25.]\n", "yt : [INFO ] 2020-11-06 15:09:13,402 Parameters: cosmological_simulation = 1\n", "yt : [INFO ] 2020-11-06 15:09:13,402 Parameters: current_redshift = -4.811891664902035e-05\n", "yt : [INFO ] 2020-11-06 15:09:13,403 Parameters: omega_lambda = 0.762\n", "yt : [INFO ] 2020-11-06 15:09:13,403 Parameters: omega_matter = 0.238\n", "yt : [INFO ] 2020-11-06 15:09:13,404 Parameters: omega_radiation = 0.0\n", "yt : [INFO ] 2020-11-06 15:09:13,404 Parameters: hubble_constant = 0.73\n", "yt : [INFO ] 2020-11-06 15:09:13,499 Allocating for 4.194e+06 particles\n", "Loading particle index: 100%|██████████| 12/12 [00:00<00:00, 170.64it/s]\n" ] } ], "source": [ "ds = yt.load_sample(\"snapshot_033\")\n", "ad = ds.all_data()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's check out some chunk info:" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "12" ] }, "execution_count": 6, "metadata": {}, "output_type": "execute_result" } ], "source": [ "ds.index._identify_base_chunk(ad)\n", "chunks = list(ds.index._chunk_io(ad))\n", "len(chunks)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "So we have 12 chunks for this dataset. Ok, let's read in a field:" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 109 ms, sys: 24.4 ms, total: 134 ms\n", "Wall time: 127 ms\n" ] } ], "source": [ "%%time\n", "si = ad[('PartType4','Silicon')]" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "unyt_array([0.00313673, 0.0035234 , 0.00203385, ..., 0.00464466,\n", " 0.00060355, 0.00029561], '(dimensionless)')" ] }, "execution_count": 4, "metadata": {}, "output_type": "execute_result" } ], "source": [ "si" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## parallel read \n", "\n", "OK, so now if we spin up a dask client, the delayed dataframes that we build in `_read_particle_selection` will automatically be split up across our chunks! So let's spin up a client, using 4 workers and 3 threads per worker, so each chunk will get its own process:" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [], "source": [ "from dask.distributed import Client " ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [], "source": [ "c = Client(threads_per_worker=3,n_workers=4)" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "data": { "text/html": [ "<table style=\"border: 2px solid white;\">\n", "<tr>\n", "<td style=\"vertical-align: top; border: 0px solid white\">\n", "<h3 style=\"text-align: left;\">Client</h3>\n", "<ul style=\"text-align: left; list-style: none; margin: 0; padding: 0;\">\n", " <li><b>Scheduler: </b>tcp://127.0.0.1:41561</li>\n", " <li><b>Dashboard: </b><a href='http://127.0.0.1:8787/status' target='_blank'>http://127.0.0.1:8787/status</a></li>\n", "</ul>\n", "</td>\n", "<td style=\"vertical-align: top; border: 0px solid white\">\n", "<h3 style=\"text-align: left;\">Cluster</h3>\n", "<ul style=\"text-align: left; list-style:none; margin: 0; padding: 0;\">\n", " <li><b>Workers: </b>4</li>\n", " <li><b>Cores: </b>12</li>\n", " <li><b>Memory: </b>33.24 GB</li>\n", "</ul>\n", "</td>\n", "</tr>\n", "</table>" ], "text/plain": [ "<Client: 'tcp://127.0.0.1:41561' processes=4 threads=12, memory=33.24 GB>" ] }, "execution_count": 9, "metadata": {}, "output_type": "execute_result" } ], "source": [ "c" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "and read in a couple different fields (so that we're not just reading the cached field):" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 305 ms, sys: 60.2 ms, total: 365 ms\n", "Wall time: 3.89 s\n" ] } ], "source": [ "%%time\n", "d = ad[('PartType0','Density')]" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "unyt_array([ 6577205. , 15850306. , 6765328.5, ..., 4304681.5,\n", " 5155429.5, 7586393.5], 'code_mass/code_length**3')" ] }, "execution_count": 11, "metadata": {}, "output_type": "execute_result" } ], "source": [ "d" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 126 ms, sys: 17.5 ms, total: 144 ms\n", "Wall time: 572 ms\n" ] } ], "source": [ "%%time\n", "ox = ad[('PartType4','Oxygen')]" ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 151 ms, sys: 32.1 ms, total: 183 ms\n", "Wall time: 646 ms\n" ] } ], "source": [ "%%time\n", "T = ad[('PartType0','Temperature')]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "That initial density read takes a bit of extra time (due to some internal dask initialization?) but we can see subsequent reads of other fields are much faster. The CPU time are fairly close to the single processor read, but we can see the wall time is 400-500 seconds slower for the parallel read, which can be attributed to the extra communication overhead between processes.\n", "\n", "Here's a screenshot of the Task Stream during the `Temperature` read:\n", "\n", "" ] }, { "cell_type": "markdown", "metadata": {}, "source": [] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [], "source": [ "c.close()" ] }, { "cell_type": "code", "execution_count": 26, "metadata": {}, "outputs": [], "source": [ "c = Client(threads_per_worker=1,n_workers=4)" ] }, { "cell_type": "code", "execution_count": 27, "metadata": {}, "outputs": [ { "data": { "text/html": [ "<table style=\"border: 2px solid white;\">\n", "<tr>\n", "<td style=\"vertical-align: top; border: 0px solid white\">\n", "<h3 style=\"text-align: left;\">Client</h3>\n", "<ul style=\"text-align: left; list-style: none; margin: 0; padding: 0;\">\n", " <li><b>Scheduler: </b>tcp://127.0.0.1:37585</li>\n", " <li><b>Dashboard: </b><a href='http://127.0.0.1:8787/status' target='_blank'>http://127.0.0.1:8787/status</a></li>\n", "</ul>\n", "</td>\n", "<td style=\"vertical-align: top; border: 0px solid white\">\n", "<h3 style=\"text-align: left;\">Cluster</h3>\n", "<ul style=\"text-align: left; list-style:none; margin: 0; padding: 0;\">\n", " <li><b>Workers: </b>4</li>\n", " <li><b>Cores: </b>4</li>\n", " <li><b>Memory: </b>33.24 GB</li>\n", "</ul>\n", "</td>\n", "</tr>\n", "</table>" ], "text/plain": [ "<Client: 'tcp://127.0.0.1:37585' processes=4 threads=4, memory=33.24 GB>" ] }, "execution_count": 27, "metadata": {}, "output_type": "execute_result" } ], "source": [ "c" ] }, { "cell_type": "code", "execution_count": 29, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 340 ms, sys: 26.4 ms, total: 366 ms\n", "Wall time: 4.1 s\n" ] } ], "source": [ "%%time\n", "met = ad[('PartType4','Metallicity')]" ] }, { "cell_type": "code", "execution_count": 30, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 147 ms, sys: 3.92 ms, total: 151 ms\n", "Wall time: 661 ms\n" ] } ], "source": [ "%%time \n", "hel = ad[('PartType4','Helium')]" ] }, { "cell_type": "code", "execution_count": 25, "metadata": {}, "outputs": [], "source": [ "c.close()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.8" } }, "nbformat": 4, "nbformat_minor": 4 }