# rasterio, dask, xarray and parallel processing

The goal of this notebook is to compare rasterio's [concurrent processing example](https://rasterio.readthedocs.io/en/latest/topics/concurrency.html) with equivalent workflows using dask and xarray.
See also:
https://github.com/mapbox/rasterio/pull/2010

It'd be good to clarify:

- thread safety and need (or no need) for locks?
- what complications or benefits does xr.open_rasterio() bring? aka when to maybe just use dask and rasterio?

In [None]:
# Get the script and data file for rasterio example
# !wget -nc https://raw.githubusercontent.com/mark-boer/rasterio/master/examples/thread_pool_executor.py
# !wget -nc https://raw.githubusercontent.com/mapbox/rasterio/master/tests/data/RGB.byte.tif

In [None]:
!time python thread_pool_executor.py RGB.byte.tif test.tif -j 1

In [None]:
!time python thread_pool_executor.py RGB.byte.tif test.tif -j 4

# Let's try to accomplish the same w/ xarray + dask

Note: seems best to restart the kernel for each of these timing blocks to ensure cache isn't used

In [None]:
%%time

# NOTE: CPU-intensive function that operates on a numpy array & "reverses bands inefficiently"
from rasterio._example import compute

from dask.distributed import Client, LocalCluster, progress
import dask
import rioxarray as rioxr
import xarray as xr

with LocalCluster(n_workers=1, threads_per_worker=1, processes=False) as cluster, Client(cluster) as client:
 
 #equivalent to 'block_windows' should be aligned with tif blocks for efficiency
 da = rioxr.open_rasterio('RGB.byte.tif', chunks=dict(x=128, y=128)) 
 
 # returns DataArray, we lose attributes though 
 val = xr.apply_ufunc(compute, da, dask='parallelized')
 
 # write to geotiff
 val.rio.to_raster('test.tif', dtype='uint8', driver='GTiff')

In [None]:
%%time
from rasterio._example import compute
from dask.distributed import Client, LocalCluster, progress
import dask
import rioxarray as rioxr
import xarray as xr

with LocalCluster(processes=False) as cluster, Client(cluster) as client:
 print(client)
 
 #equivalent to 'block_windows' should be aligned with tif blocks for efficiency
 # better to use map_blocks?
 da = rioxr.open_rasterio('RGB.byte.tif', chunks=dict(x=128, y=128)) 
 
 # returns DataArray, we lose attributes though 
 val = xr.apply_ufunc(compute, da, dask='parallelized')
 
 # write to geotiff
 val.rio.to_raster('test.tif', dtype='uint8', driver='GTiff')

In [None]:
%%time

# NOTE: first run 3 sec, second run 1.3 s (due to dask caching)

# NOTE: CPU-intensive function that operates on a numpy array & "reverses bands inefficiently"
from rasterio._example import compute
import rioxarray as rioxr
import xarray as xr


# NOTE: not specifying cluster config w/ dask.distributed defaults to 'ThreadPool' with num_workers=CPUs 
# https://docs.dask.org/en/latest/setup/single-machine.html, but you can change defaults like this:
#from multiprocessing.pool import ThreadPool
#import dask
#dask.config.set(pool=ThreadPool(4))

#equivalent to 'block_windows' should be aligned with tif blocks for efficiency
da = rioxr.open_rasterio('RGB.byte.tif', chunks=dict(x=128, y=128)) 

# returns DataArray, we lose attributes though 
val = xr.apply_ufunc(compute, da, dask='parallelized')

# write to geotiff
val.rio.to_raster('test.tif', dtype='uint8', driver='GTiff')

In [None]:
# check that output file isn't nonsense
!gdalinfo test.tif