{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Multi-Cloud Workflow with Pangeo\n",
"\n",
"\n",
"\n",
"This example demonstrates a workflow using analysis-ready data provided in two public clouds.\n",
"\n",
"* [LENS](https://registry.opendata.aws/ncar-cesm-lens/) (Hosted on AWS in the us-west-2 region)\n",
"* [ERA5](https://www.ecmwf.int/en/forecasts/datasets/reanalysis-datasets/era5) (Hosted on Google Cloud Platform in multiple regions)\n",
"\n",
"We'll perform a similar analysis on each of the datasets, a histogram of the total precipitation, compare the results. Notably, this computation reduces a large dataset to a small summary. The reduction can happen on a cluster in the cloud.\n",
"\n",
"By placing a compute cluster in the cloud next to the data, we avoid moving large amounts of data over the public internet. The large analysis-ready data only needs to move within a cloud region: from the machines storing the data in an object-store like `S3` to the machines performing the analysis. The compute cluster reduces the large amount of data to a small histogram summary. At just a handful of KBs, the summary statistics can easily be moved back to the local client, which might be running on a laptop. This also avoids costly egress charges from moving large amounts of data out of cloud regions."
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"import getpass\n",
"\n",
"import dask\n",
"from distributed import Client\n",
"from dask_gateway import Gateway, BasicAuth\n",
"import intake\n",
"import numpy as np\n",
"import s3fs\n",
"import xarray as xr\n",
"from xhistogram.xarray import histogram"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Create Dask Clusters"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We've deployed [Dask Gateway](https://gateway.dask.org/) on two Kubernetes clusters, one in AWS and one in GCP. We'll use these to create [Dask](https://dask.org/) clusters in the same cloud region as the data. We'll connect to both of them from the same interactive notebook session."
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"name": "stdin",
"output_type": "stream",
"text": [
" ····\n"
]
}
],
"source": [
"password = getpass.getpass()\n",
"auth = BasicAuth(\"pangeo\", password)"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"
"
],
"text/plain": [
"\n",
"dask.array\n",
"Coordinates:\n",
" * latitude (latitude) float32 90.0 89.75 89.5 89.25 ... -89.5 -89.75 -90.0\n",
" * longitude (longitude) float32 0.0 0.25 0.5 0.75 ... 359.25 359.5 359.75\n",
" * time (time) datetime64[ns] 1990-01-01T02:30:00 ... 2005-12-31T20:30:00"
]
},
"execution_count": 9,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# convert to 6-hourly precip totals\n",
"tp_6hr = tp.coarsen(time=6).sum()\n",
"tp_6hr"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We'll bin this data into the following bins."
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"array([0.00000000e+00, 1.00000000e-05, 1.26485522e-05, 1.59985872e-05,\n",
" 2.02358965e-05, 2.55954792e-05, 3.23745754e-05, 4.09491506e-05,\n",
" 5.17947468e-05, 6.55128557e-05, 8.28642773e-05, 1.04811313e-04,\n",
" 1.32571137e-04, 1.67683294e-04, 2.12095089e-04, 2.68269580e-04,\n",
" 3.39322177e-04, 4.29193426e-04, 5.42867544e-04, 6.86648845e-04,\n",
" 8.68511374e-04, 1.09854114e-03, 1.38949549e-03, 1.75751062e-03,\n",
" 2.22299648e-03, 2.81176870e-03, 3.55648031e-03, 4.49843267e-03,\n",
" 5.68986603e-03, 7.19685673e-03, 9.10298178e-03, 1.15139540e-02,\n",
" 1.45634848e-02, 1.84206997e-02, 2.32995181e-02, 2.94705170e-02,\n",
" 3.72759372e-02, 4.71486636e-02, 5.96362332e-02, 7.54312006e-02,\n",
" 9.54095476e-02, 1.20679264e-01, 1.52641797e-01, 1.93069773e-01,\n",
" 2.44205309e-01, 3.08884360e-01, 3.90693994e-01, 4.94171336e-01,\n",
" 6.25055193e-01, 7.90604321e-01, 1.00000000e+00])"
]
},
"execution_count": 10,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"tp_6hr_bins = np.concatenate([[0], np.logspace(-5, 0, 50)])\n",
"tp_6hr_bins"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The next cell applies the histogram to the `longitude` dimension and takes the mean over `time`.\n",
"We're still just building up the computation here, we haven't actually loaded the data or executed it yet."
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"
\n",
"
\n",
"
\n",
"
\n",
" \n",
"
Array
Chunk
\n",
" \n",
" \n",
"
Bytes
288.40 kB
288.40 kB
\n",
"
Shape
(721, 50)
(721, 50)
\n",
"
Count
110889 Tasks
1 Chunks
\n",
"
Type
float64
numpy.ndarray
\n",
" \n",
"
\n",
"
\n",
"
\n",
"\n",
"
\n",
"
\n",
"
"
],
"text/plain": [
"dask.array"
]
},
"execution_count": 11,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"tp_hist = histogram(\n",
" tp_6hr.rename('tp_6hr'), bins=[tp_6hr_bins], dim=['longitude']\n",
").mean(dim='time')\n",
"tp_hist.data"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"In total, we're going from the ~1.5TB raw dataset down to a small 288 kB result that is the histogram summarizing the total precipitation. We've built up a large sequence of operations to do that reduction (over 110,000 individual tasks), and now it's time to actually execute it. There will be some delay between running the next cell, the scheduler receiving the task graph, and the cluster starting to process it, but work is happening in the background. After a minute or so, tasks will start appearing on the Dask dashboard.\n",
"\n",
"One thing to note: we request this result with the `gcp_client`, the client for the cluster in the same cloud region as the data."
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [],
"source": [
"era5_tp_hist_ = gcp_client.compute(tp_hist, retries=5)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"`gcp_tp_hist_` is a `Future` pointing to the result on the cluster. The actual computation is happening in the background, and we'll call `.result()` to get the concrete result later on."
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"Future: finalize status: pending, key: finalize-827454f3f45ccd1c7c22f0b3907c098c"
],
"text/plain": [
""
]
},
"execution_count": 13,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"era5_tp_hist_"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Because the Dask cluster is in adaptive mode, this computation has kicked off a chain of events: Dask has noticed that it suddenly has many tasks to compute, so it asks the cluster manager (Kubernetes in this case) for more workers. THe Kubernetes cluster then asks it's compute backend (Google Compute Engine in this case) for more virtual machines. As these machines come online, our workers will come to life and the cluster will start progressing on our computation."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## LENS on AWS\n",
"\n",
"This computation is very similar to the ERA5 computation. The primary difference is that the LENS dataset is an ensemble. We'll histogram a single member of that ensemble.\n",
"\n",
"The Intake catalog created by NCAR includes many things, so we'll use `intake-esm` to search for the URL we want."
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"
This data is part of the project 'Blind Evaluation of Lossy Data-Compression in LENS'. Please exercise caution before using this data for other purposes.
"
],
"text/plain": [
"dask.array"
]
},
"execution_count": 19,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"lens_hist.data"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Note that we're using the `aws_client`, because LENS is stored in an AWS region."
]
},
{
"cell_type": "code",
"execution_count": 20,
"metadata": {},
"outputs": [],
"source": [
"lens_hist_ = aws_client.compute(lens_hist)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Compare results\n",
"\n",
"Let's plot the histograms for both the ERA5 and LENS data. These are small results so it's safe to transfer the result from the cluster to the client machine for plotting."
]
},
{
"cell_type": "code",
"execution_count": 21,
"metadata": {},
"outputs": [],
"source": [
"lens_tp_hist_ = lens_hist_.result()\n",
"era5_tp_hist_ = era5_tp_hist_.result()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"For ERA5:"
]
},
{
"cell_type": "code",
"execution_count": 22,
"metadata": {},
"outputs": [
{
"data": {
"image/png": "\n",
"text/plain": [
""
]
},
"metadata": {
"needs_background": "light"
},
"output_type": "display_data"
}
],
"source": [
"era5_tp_hist_[: ,1:].plot(xscale='log');"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"And for LENS:"
]
},
{
"cell_type": "code",
"execution_count": 23,
"metadata": {},
"outputs": [
{
"data": {
"image/png": "\n",
"text/plain": [
""
]
},
"metadata": {
"needs_background": "light"
},
"output_type": "display_data"
}
],
"source": [
"lens_tp_hist_[: ,1:].plot(xscale='log');"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Cleanup\n",
"\n",
"Closing the clients will free all our resources."
]
},
{
"cell_type": "code",
"execution_count": 24,
"metadata": {},
"outputs": [],
"source": [
"aws_client.close()\n",
"aws.close()\n",
"\n",
"gcp_client.close()\n",
"gcp.close()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Behind the Scenes\n",
"\n",
"We deployed some infrastructure to make this notebook runnable. In line with one of Pangeo's guiding principles, each of these technologies has an [open architechture](https://medium.com/pangeo/closed-platforms-vs-open-architectures-for-cloud-native-earth-system-analytics-1ad88708ebb6).\n",
"\n",
"\n",
"\n",
"From low-level to high-level\n",
"\n",
"* **Terraform** provides the tools for provisioning the cloud resources needed for the clusters.\n",
"* **Kubernetes** provides the container orchestration for deploying the Dask Clusters. We created kubernetes clusters in AWS's `us-west-2` and GCP's `us-central1` regsions.\n",
"* **Dask-Gatway** provides centralized, secure access to Dask Clusters. These clusters were deployed using [helm](https://helm.sh/) on two Kubernetes clusters.\n",
"* **Dask** provides scalable, distributed computation for analyzing these large datasets\n",
"* **xarray** provides high-level APIs and high-performance data structures for working with this data\n",
"* **Intake, gcsfs, s3fs** provide catalogs for data discover and libraries for loading that data\n",
"* **Jupyterlab** provides a user interface for interactive computing. The client laptop interacts with the clusters through Jupyterlab.\n",
"\n",
"\n",
"All of the resources for this demo are available at https://github.com/pangeo-data/multicloud-demo."
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.2"
}
},
"nbformat": 4,
"nbformat_minor": 4
}