= A First Look at Dask on ARM on K8s :uri-asciidoctor: http://asciidoctor.org After getting the cluster set up in the previous post, it was time to finally play with Dask on the cluster. Thankfully, there are link:$$https://github.com/dask/dask-kubernetes$$[dask-kubernetes] and link:$$https://github.com/dask/dask-docker$$[dask-docker] projects that provide the framework to do this. Since I'm still new to Dask, I decided to start off by using Dask from a local notebook (in retrospect maybe not the best choice). == Getting Dask on ARM in Docker The dask-docker project gives us a good starting point for building a container for Dask, but the project's containers are only built for amd64. I started off by trying to rebuild the containers without any modifications, but it turned out there were a few issues that I needed to address. The first is that the regular conda docker image is also only built for amd64. Secondly, some of the packages that the Dask container uses are also not yet cross-built. While these problems will likely go away over the coming year, for the time being, I solved these issues by making a multi-platform condaforge docker container, asking folks to rebuild packages, and, when the packages did not get rebuilt, installing from source. To do this I created a new Dockerfile for replacing miniconda base with miniforge: [source, dockerfile] ---- include::../subrepos/dask-docker/miniforge/Dockerfile[] ---- Most of the logic lives in this setup script: [source, sh] ---- include::../subrepos/dask-docker/miniforge/setup.sh[] ---- I chose to install mamba, a fast C++ reimplementation of conda, and use this to install the rest of the packages. I did this since debugging the package conflicts with the regular conda program was resulting in confusing error messages, and mamba can have clearer error messages. I created a new version of the "base" Dockerfile, from dask-docker, which installed the packages with mamba and pip when not available from conda. [source, dockerfile] ---- include::../subrepos/dask-docker/base/Dockerfile[] ---- One interesting thing I noticed while exploring this is the link::$$https://github.com/dask/dask-docker/blob/master/base/prepare.sh$$[prepare.sh script] that is used as the entry point for the container. This script checks a few different environment variables that, when present, are used to install additional packages (Python or system) at container startup. While normally putting all of the packages into a container is best (since installations can be flaky and slow), this does allow for faster experimentation. At first glance, it seems like this still requires a Dask cluster restart to add a new package, but I'm going to do more exploring here. == Getting Dask on Kube With the containers built, the next step was trying to get them running on Kubernetes. I first tried the helm installation, but I wasn't super sure how to configure it to use my new custom containers and the documentation also contained warnings indicating that Dask with helm did not play well with dynamic scaling. Since I'm really interested in exploring how the different systems support dynamic scaling, I decided to install the dask-kubernetes project. With dask-kubernetes, I can create a cluster by running: [source, python] ---- include::../subrepos/scalingpythonml/dask-examples/TestNB1.py[tags=create_in_default] ---- As I was setting this up, I realized it was creating resources in the default namespace, which made keeping track of everything difficult. So I created a namespace, service account, and role binding so that I could better keep track of (and clean up) everything: [source, python] ---- include::../subrepos/scalingpythonml/dask-examples/setup.sh[tags=setup_namespace] ---- To use this, I rewrote added another parameter to cluster creation and updated the yaml: [source, python] ---- include::../subrepos/scalingpythonml/dask-examples/TestNB1.py[tags=create_in_namespace] ---- The from_yaml is important, as it lets me specify specific containers and resource requests (which will be useful when working with GPUs). I modified the standard worker-spec to use the namespace and service account I created. [source, yaml] ---- include::../subrepos/scalingpythonml/dask-examples/worker-spec.yaml[] ---- While this would work if I was _inside_ the Kubernetes cluster I wanted to start with an experimental notebook outside the cluster. This required some changes, and in retrospect is not where I should have started. == Dask in Kube with Notebook access There are two primary considerations when setting up Dask for notebook access on Kube. The first is where you want your notebook to run, inside the Kubernetes cluster or outside (e.g. on your machine). The second consideration is if you want the Dask scheduler to run alongside your notebook, or in a separate container inside of Kube. The first configuration I tried was having a notebook on my local machine. At first, I could not get it working because the scheduler was running on my local machine and could not talk to the worker pods it spun up. That's why, unless you're using host networking, I recommend having the scheduler run inside the cluster. Doing this involves adding a "deploy_mode" keyword to your KubeCluster invocation and asking Dask to create a service for your notebook to talk to the scheduler. [source, python] ---- include::../subrepos/scalingpythonml/dask-examples/TestNB2.py[tags=remote_lb_deploy] ---- Running your notebook on a local machine _may_ make getting started faster, but it comes with some downsides. It's important that you keep your client's python environment in sync with the worker/base containers. For me setting up my conda env, I ended up having to run: [source, python] ---- include::../subrepos/scalingpythonml/dask-examples/setup.sh[tags=setup_conda] ---- Another big issue you'll likely run into is that transient network errors between your notebook and the cluster can result in non-recoverable errors. This has happened to me even with networking all inside my house, so I can imagine that it would be even more common with a VPN or a cloud provider network involved. The final challenge that I ran into was with I/O. Some code will run in the workers and some will run on the client, and if your workers and client have a different network view or if there are resources that are only available inside the cluster (for me MinIO), the error messages can be confusing footnote:[I worked around this by setting up port-forwarding so that the network environment was the same between my local machine and the cluster. You could also expose the internal-only resources through a service and have internal & external access through the service, but I just wanted a quick stop-gap. This challenge convinced me I should re-run with my notebook inside the cluster.]. Note: you don't have to use Dask with Kubernetes, or even a cluster. If you don't have a cluster, or have a problem where a cluster might not be the best solution, Dask also supports other execution environments like multithreading and GPU acceleration. I'm personally excited to see how the GPU acceleration can be used together with Kubernetes. == The different APIs Dask exposes a few different APIs for distributed programming at different levels of abstraction. Dask's "core" building block is the delayed API, on top of which collections and DataFrame support is built. The delayed API is notably a lower level API than Spark's low level public APIs -- and I'm super interested to see what kind of things it enables us to do. Dask has three different types of distributed collection APIs: Bag, DataFrame, and Array. These distributed collections map relatively nicely to common Python concepts, and the DataFrame API is especially familiar. Almost footnote:[You can use the actor API within the other APIs, but it is not part of the same building blocks.] separate from the delayed and collections APIs, Dask also has an (experimental) Actor API. I'm curious to see how this API continues to be developed and used. I'd really like to see if I can use it as a parameter server. To verify my cluster was properly set up I did a quick run through the tutorials for the different APIs. == Next Steps Now that I've got Dask on Kube running on my cluster I want to do some cleanup and then explore more about how Dask handles dataframes, partitioning/distributing data/tasks, auto scaling, and GPU acceleration. If you've got any suggestions for things you'd like me to try out, do please get in touch :)