{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# rasterio, dask, xarray and parallel processing\n", "\n", "The goal of this notebook is to compare rasterio's [concurrent processing example](https://rasterio.readthedocs.io/en/latest/topics/concurrency.html) with equivalent workflows using dask and xarray.\n", "See also:\n", "https://github.com/mapbox/rasterio/pull/2010\n", "\n", "It'd be good to clarify:\n", "\n", "- thread safety and need (or no need) for locks?\n", "- what complications or benefits does xr.open_rasterio() bring? aka when to maybe just use dask and rasterio?" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Get the script and data file for rasterio example\n", "# !wget -nc https://raw.githubusercontent.com/mark-boer/rasterio/master/examples/thread_pool_executor.py\n", "# !wget -nc https://raw.githubusercontent.com/mapbox/rasterio/master/tests/data/RGB.byte.tif" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!time python thread_pool_executor.py RGB.byte.tif test.tif -j 1" ] }, { "cell_type": "raw", "metadata": {}, "source": [ "real\t0m4.926s\n", "user\t0m4.889s\n", "sys\t0m0.358s" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!time python thread_pool_executor.py RGB.byte.tif test.tif -j 4" ] }, { "cell_type": "raw", "metadata": {}, "source": [ "real\t0m1.722s\n", "user\t0m4.871s\n", "sys\t0m0.368s" ] }, { "cell_type": "raw", "metadata": {}, "source": [ "Takeaway: it's 3-4x faster running CPU-bound code on 4 parallel threads instead of 1" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Let's try to accomplish the same w/ xarray + dask\n", "\n", "Note: seems best to restart the kernel for each of these timing blocks to ensure cache isn't used" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "\n", "# NOTE: CPU-intensive function that operates on a numpy array & \"reverses bands inefficiently\"\n", "from rasterio._example import compute\n", "\n", "from dask.distributed import Client, LocalCluster, progress\n", "import dask\n", "import rioxarray as rioxr\n", "import xarray as xr\n", "\n", "with LocalCluster(n_workers=1, threads_per_worker=1, processes=False) as cluster, Client(cluster) as client:\n", " \n", " #equivalent to 'block_windows' should be aligned with tif blocks for efficiency\n", " da = rioxr.open_rasterio('RGB.byte.tif', chunks=dict(x=128, y=128)) \n", " \n", " # returns DataArray, we lose attributes though \n", " val = xr.apply_ufunc(compute, da, dask='parallelized')\n", " \n", " # write to geotiff\n", " val.rio.to_raster('test.tif', dtype='uint8', driver='GTiff')" ] }, { "cell_type": "raw", "metadata": {}, "source": [ "CPU times: user 6.27 s, sys: 424 ms, total: 6.69 s\n", "Wall time: 7.49 s\n", "\n", "Takeway: For the single-thread case we'd just be adding a bunch of overhead, but the code is easy to read and seems to work" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "from rasterio._example import compute\n", "from dask.distributed import Client, LocalCluster, progress\n", "import dask\n", "import rioxarray as rioxr\n", "import xarray as xr\n", "\n", "with LocalCluster(processes=False) as cluster, Client(cluster) as client:\n", " print(client)\n", " \n", " #equivalent to 'block_windows' should be aligned with tif blocks for efficiency\n", " # better to use map_blocks?\n", " da = rioxr.open_rasterio('RGB.byte.tif', chunks=dict(x=128, y=128)) \n", " \n", " # returns DataArray, we lose attributes though \n", " val = xr.apply_ufunc(compute, da, dask='parallelized')\n", " \n", " # write to geotiff\n", " val.rio.to_raster('test.tif', dtype='uint8', driver='GTiff')" ] }, { "cell_type": "raw", "metadata": {}, "source": [ "CPU times: user 4.68 s, sys: 12 ms, total: 4.7 s\n", "Wall time: 2.65 s\n", "\n", "Takeway: faster, but 1s slower than just using rasterio" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "\n", "# NOTE: first run 3 sec, second run 1.3 s (due to dask caching)\n", "\n", "# NOTE: CPU-intensive function that operates on a numpy array & \"reverses bands inefficiently\"\n", "from rasterio._example import compute\n", "import rioxarray as rioxr\n", "import xarray as xr\n", "\n", "\n", "# NOTE: not specifying cluster config w/ dask.distributed defaults to 'ThreadPool' with num_workers=CPUs \n", "# https://docs.dask.org/en/latest/setup/single-machine.html, but you can change defaults like this:\n", "#from multiprocessing.pool import ThreadPool\n", "#import dask\n", "#dask.config.set(pool=ThreadPool(4))\n", "\n", "#equivalent to 'block_windows' should be aligned with tif blocks for efficiency\n", "da = rioxr.open_rasterio('RGB.byte.tif', chunks=dict(x=128, y=128)) \n", "\n", "# returns DataArray, we lose attributes though \n", "val = xr.apply_ufunc(compute, da, dask='parallelized')\n", "\n", "# write to geotiff\n", "val.rio.to_raster('test.tif', dtype='uint8', driver='GTiff')" ] }, { "cell_type": "raw", "metadata": {}, "source": [ "CPU times: user 5.83 s, sys: 542 ms, total: 6.37 s\n", "Wall time: 2.73 s\n", "\n", "Takeaway: Same as above we just use single machine scheduler rather than LocalCluster from dask.distributed" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# check that output file isn't nonsense\n", "!gdalinfo test.tif" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python [conda env:notebook] *", "language": "python", "name": "conda-env-notebook-py" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.6" } }, "nbformat": 4, "nbformat_minor": 4 }