{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ " # Distributed Machine Learning Pipeline: NDVI ~ Soil + Weather Dynamics\n", " \n", "This tutorial walks thru a machine learning pipeline. This example excludes the *Extract* component in the often referenced *ETL* (Extract, Transform, Learn) machine learning nomenclature. The overall goal of this analysis is to predict NDVI dynamics from soil and lagged precipitation, temperature, and vapor pressure deficit observations. The brief outline of the tutorial is:\n", "\n", "1. Read and transform the NDVI, Soil, and Weather data.\n", "2. Merge the three datasets and add 26 weekly lags of precipitation, vpd, and temperature as features.\n", "3. Shuffle and split data into three groups:\n", " * 3% for hyperparameter optimization (Group 1)\n", " * 97 % for final model\n", " * 77.6% (97% * 80%) for final model training (Group 2)\n", " * 19.4% (97% * 20%) for final model testing (validation) (Group 3)\n", "3. Optimize the hyperparamters in an XGBoost model (Xtreme Gradient Boosting) using a small subset of the data.\n", "4. Using the \"best fit\" hyperparameters, train the model 77.6% of the data (Group 2).\n", "5. Validation with the test (hold-out) data (19.4% - Group 3)\n", "\n", "## Table of Contents\n", "1. [Build a Distributed Cluster](#build-a-distributed-cluster)\n", "2. [Preprocess, Transform, and Merge the Data](#preprocess-transfor-and-merge-the-data)\n", "3. [Machine Learning: XGBoost Model](#machine-learning-xgboost-model)\n", "4. [Interpreting the Model](#interpreting-the-model)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import xarray as xr\n", "import numpy as np\n", "import matplotlib.pyplot as plt\n", "import dask_jobqueue as jq\n", "import dask\n", "from dask import dataframe as ddf\n", "from dask import array as da\n", "import os\n", "from dask.distributed import Client, wait\n", "import pandas as pd\n", "from sklearn.base import BaseEstimator, TransformerMixin\n", "from tqdm.notebook import tqdm" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Build a Distributed Cluster\n", "\n", "We will use [dask-jobqueue](https://jobqueue.dask.org/en/latest/) to launch and scale a cluster. For a more detailed example of how this works, please see the other tutorials in the SCINet Geospatial 2020 Workshop. For a quick review, the workflow for defining a cluster and scaling is:
\n", " 1. Dask-jobqueue submits jobs to Slurm with an sbatch script\n", " 2. The sbatch scripts define the dask workers with the following info:\n", " * Partition to launch jobs/workers (```partition```)\n", " * X number of processes (i.e. dask workers) per sbatch script (```num_processes```).\n", " * Number of threads/cpus per dask worker (```num_threads_per_process```)\n", " * Memory allocated per sbatch scipt (```mem```), which is spread evenly between the dask workers.\n", " 3. Scale the cluster to the total # of workers. Needs to be a multiple of num_processes.\n", "\n", "In this example, we are defining one process (dask worker) per sbatch script. Each process will have 40 cpus (an entire node). We then scale the cluster to 9 workers, which total 360 threads at 1.15TB of memory." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "partition='short,brief-low'\n", "num_processes = 1\n", "num_threads_per_process = 40\n", "mem = 3.2*num_processes*num_threads_per_process\n", "n_cores_per_job = num_processes*num_threads_per_process\n", "container = '/lustre/project/geospatial_tutorials/wg_2020_ws/data_science_im_rs_vSCINetGeoWS_2020.sif'\n", "env = 'py_geo'\n", "clust = jq.SLURMCluster(queue=partition,\n", " processes=num_processes,\n", " memory=str(mem)+'GB',\n", " cores=n_cores_per_job,\n", " interface='ib0',\n", " local_directory='$TMPDIR',\n", " death_timeout=30,\n", " python=\"singularity exec {} /opt/conda/envs/{}/bin/python\".format(container,env),\n", " walltime='01:30:00',\n", " job_extra=[\"--output=/dev/null\",\"--error=/dev/null\"])\n", "cl=Client(clust)\n", "cl\n", "print('The Dask dashboard address is: /user/'+os.environ['USER']+'/proxy/'+cl.dashboard_link.split(':')[-1].split('/')[0]+'/status')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**View Cluster Dashboard**\n", "To view the cluster with the dask dashboard interaface click the dask icon on the left menu pane. Copy and paste the above dashboard address (in the form of /user/{User.Name}/proxy/{port#}/status) into the address bar. Then click on the \"Workers\", \"Progress\", \"Task Stream\", and \"CPU\" to open those tabs. Drag and arrange in convineint layout on right-hand side of the screen. Note these panes should be mostly blank as we have yet to scale the cluster, which is the next step below.\n", "\n", "Dask Icon:\n", "" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**Scale the Cluster** to 9 workers (40 cpus per worker). This may take 5-20 seconds to complete.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#scale the cluster\n", "n_workers=9\n", "clust.scale(n=n_workers*num_processes)\n", "#Wait for the cluster to load, show progress bar.\n", "with tqdm(total=n_workers*num_processes) as pbar:\n", " while (((cl.status == \"running\") and (len(cl.scheduler_info()[\"workers\"]) < n_workers*num_processes))):\n", " pbar.update(len(cl.scheduler_info()[\"workers\"])-pbar.n)\n", " pbar.update(len(cl.scheduler_info()[\"workers\"])-pbar.n)\n", "cl" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#Lets see the workers are running in SLURM\n", "me = os.environ['USER']\n", "!squeue -u $me" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Preprocess, Transform, and Merge the Data" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Harmonized Landsat Sentinel Data\n", "\n", "Link to data repository: https://hls.gsfc.nasa.gov/\n", "\n", "**Workflow:**\n", " 1. Data is stored in the Zarr format with three dimensions (x,y,time).\n", " 2. Read with xarray.\n", " 3. Divide the data into chunks. Here we have chunked the data by: x=20 pixels, y=20 pixels, date=Entire Dataset\n", " 4. Subset the data to only included \"growing season\" months.\n", " 5. Convert the xarray object to a 2-Dimensional dataframe.\n", " \n", "Notice that the data is not read to memory. The only information stored is the \"task graph\" and metadata about the final results." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#Read the data with Xarray and rechunk\n", "ndvi = xr.open_zarr('/lustre/project/geospatial_tutorials/wg_2020_ws/data/cper_hls_ndvi.zarr/').chunk({'x':20,'y':20,'date':-1})\n", "ndvi" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#Select relevant months and then convert to a dataframe\n", "ndvi_df = ndvi.sel(date=ndvi['date.month'].isin([5,6,7,8,9])).to_dask_dataframe()\n", "#Only include reasonable values (.1 < NDVI < 1.0) in the analysis\n", "ndvi_df = ndvi_df[(ndvi_df.ndvi>.1)&(ndvi_df.ndvi<1.)]\n", "print('There are '+f'{len(ndvi_df):,}'+' NDVI observations.')\n", "ndvi_df" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Polaris Soil Hydraulic Data\n", "\n", "Paper Describing the Data: https://agupubs.onlinelibrary.wiley.com/doi/abs/10.1029/2018WR022797
\n", "Data Repository Source: http://hydrology.cee.duke.edu/POLARIS/PROPERTIES/v1.0/\n", "\n", "**Workflow:**\n", " 1. Data is stored in the Zarr format with two dimensions (x,y) and includes 13 variables at 6 depths (78 total). Read with xarray.\n", " 2. Interpolate the data to the same grid as the HLS NDVI data.\n", " 3. Convert the xarray object to a 2-Dimensional Pandas dataframe." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "soil = xr.open_zarr('/lustre/project/geospatial_tutorials/wg_2020_ws/data/polaris_soils.zarr/')\n", "#Interpolate to the HLS NDVI grid\n", "soil_df = soil.interp(x=ndvi.x,y=ndvi.y,method='linear').squeeze().to_dataframe().reset_index()\n", "soil_df" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### PRISM Precipitation, Tempature, and Vapor Pressure Deficit Data\n", "\n", "PRISM Data in a CSV file. Note this data was queried at a single point at the center of CPER.\n", "\n", "**Workflow:**\n", " 1. Data is stored in the csv format and includes 7 variables. Read with Pandas using:\n", " * Skip the 1st 10 rows (PRISM metadata)\n", " * Convert the time column from a generic object to a date-time object.\n", " 2. Rename the \"Date\" to \"date\" to match HLS NDVI data.\n", " 3. Set the \"date\" column as the index.\n", " 4. Sort the data into descending." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df_env = pd.read_csv('/lustre/project/geospatial_tutorials/wg_2020_ws/data/PRISM_ppt_tmin_tmean_tmax_tdmean_vpdmin_vpdmax_provisional_4km_20120101_20200101_40.8269_-104.7154.csv',\n", " skiprows=10,\n", " infer_datetime_format=True,\n", " parse_dates = ['Date']).rename(columns={'Date':'date'}).set_index('date').sort_index(ascending=False)\n", "df_env" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Transform Function to Merge NDVI, Soil, and PRISM data.\n", "\n", "Here we develop a class to merge the three dataset. Note the most import code is in the ```def transform``` function." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#Costum transformer in the scikit-learn API syntax\n", "class merge_dsets(BaseEstimator,TransformerMixin):\n", " def __init__(self, df_soil, df_env,lag):\n", " self.soil = df_soil\n", " self.env = df_env\n", " self.lag = lag\n", " #self.lag_interv = lag_interval\n", " def fit(self, X, y=None):\n", " return self\n", " def transform(self, X, y=None):\n", " df = X.copy()\n", " df = df.merge(self.soil, on =['x','y'])\n", " df_env_m = pd.DataFrame()\n", " for i,d in enumerate(df.date.unique()):\n", " df_env_temp = df_env[df_env.index\n", "The \"*learn*\" portion in the ETL pipeline." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sklearn.pipeline import Pipeline\n", "import xgboost as xgb\n", "#from dask_ml.xgboost import XGBRegressor as dask_XGBRegressor\n", "from dask_ml.model_selection import train_test_split\n", "from sklearn.metrics import r2_score\n", "from dask_ml.model_selection import GridSearchCV\n", "from sklearn.model_selection import GridSearchCV as sk_GridSearchCV\n", "import joblib" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Hyperparameter Optimization\n", "\n", "Shuffle and subset data to a *managable size* (e.g. will fit in memory when running 360 simaltaneous models). We will use a grid-search, combined with 3-fold cross validation, approach to optimize the relevant hyperparameters (see table below).\n", "\n", "| Hyperparameter | Grid | n |\n", "|------------|-------------------|---|\n", "|n_estimators | [150, 250, 300, 350] | 4 |\n", "|learning_rate| [0.05, 0.1, 0.2, 0.3]|4|\n", "|max_depth|[5, 7, 9, 11]|4|\n", "|colsample_bytree|[.1, .2, .3]|3|\n", "|gamma|[.05, .1, .2]|3|\n", "\n", "A total of 1728 models (4 * 4 * 4 * 3 * 3 * 3) will be fit. The hyperparameters assocated with the best scoring model (highest R2) will be used to train the remianing data.\n", "\n", "This search can take ~1-2 hour using 360 cores. To run the hyperparameter gridsearch cross validation, set the ```optimize_hyperparameter``` variable to ```True``` (see two cells below). If you leave as ```False```, we will skip the hyperparameter calculatoins, and just use the hyperparameter values previously calculated." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "X_train_hyp, X = train_test_split(ndvi_df,\n", " test_size=0.97,\n", " shuffle=True,\n", " random_state=34)\n", "X_train_hyp,Y_train_hyp = dask.compute(*merge_dsets(df_soil=soil_df,\n", " df_env=df_env,\n", " lag=26).transform(X_train_hyp))\n", "X_train_hyp" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Set to True if you want to run the Gridsearch. This can take >1.5 hrs. Therefore, \n", "# if set to false, the results (best hyperparameters) hardcoded from a previous run \n", "# of the model\n", "optimize_hyperparameters = False" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "if optimize_hyperparameters:\n", " #Define the grid - space\n", " param_dist = {'n_estimators': [150,250,300,350],\n", " 'learning_rate': [0.05, 0.1, 0.2, 0.3],\n", " 'max_depth': [5, 7, 9, 11],\n", " 'colsample_bytree': [.1, .2, .3],\n", " 'gamma': [.05, .1, .2]}\n", " #Define the XGBoost model\n", " reg = xgb.XGBRegressor(n_jobs=1,verbosity=3)\n", " #Setup the GridsearchCV function\n", " gs = GridSearchCV(reg,param_dist,cv=3,scheduler=cl,refit=False,cache_cv=False)\n", " #Fit all the models\n", " gs.fit(X_train_hyp.values,Y_train_hyp.values)\n", " #Get the best fitting parameters\n", " df_params = pd.DataFrame(gs.cv_results_)\n", " best_params = df_params[df_params.mean_test_score==df_params.mean_test_score.max()]\n", " best_params = best_params.params.values[0]\n", " print(best_params)\n", "else:\n", " #Best fit parameters from previous run\n", " best_params = {'colsample_bytree': 0.2,\n", " 'gamma': 0.1,\n", " 'learning_rate': 0.05,\n", " 'max_depth': 7,\n", " 'n_estimators': 350}\n", " print('Using the previously calculated parameters, which are:')\n", " print(best_params)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Distributed XGBoost Model\n", "\n", " * Shuffle and split data into \"training\" (80%) and \"testing\" (20%). Leave as dask dataframes (data needs to be distributed across all workers), so we will call ```dask.persist``` to trigger the calculation (rather than dask.compute).\n", " * Train XGBoost model using the training data.\n", " * Model Validation / Accuracy (r2) with \"testing\" data" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Split the data\n", "X_train, X_test = train_test_split(X,\n", " test_size=0.2,\n", " shuffle=True)\n", "#Merge the weather/soil data and persist the data across the cluster\n", "[X_train,Y_train],[X_test,Y_test] = dask.persist(*[merge_dsets(df_soil=soil_df,df_env=df_env,lag=26).transform(X_train),\n", " merge_dsets(df_soil=soil_df,df_env=df_env,lag=26).transform(X_test)])\n", "wait([X_train,X_test,Y_train,Y_test])\n", "X_train" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#Setup the Distributed XGBoost model and train it on the \"training\" data\n", "dtrain = xgb.dask.DaskDMatrix(cl, X_train, Y_train)\n", "reg_b = xgb.dask.train(cl,\n", " best_params,\n", " dtrain,\n", " num_boost_round=125,\n", " evals=[(dtrain, 'train')])\n", "print(reg_b)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#Get the R2 results for the testing data\n", "dtest = xgb.dask.DaskDMatrix(cl, X_test)\n", "pred = xgb.dask.predict(cl, reg_b['booster'], dtest)\n", "reg_r2 = r2_score(Y_test.ndvi.compute().values,pred)\n", "print(\"The overall R2 is: \"+str(reg_r2))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#Big Data Plotting Libraries\n", "import datashader as ds\n", "import holoviews as hv\n", "from holoviews.operation.datashader import datashade, shade, dynspread, rasterize\n", "hv.extension('bokeh')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#Plot the results\n", "Y_plotting = Y_test.compute()\n", "Y_plotting['pred']=pred.compute()\n", "Y_plotting" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#To plot all the points, we need to rasterize the data (aka a 2d histogram)\n", "pts_res = hv.Points(Y_plotting.values,label=\"\")\n", "rasterize(pts_res).redim.range(Count=(10, 2000)).opts(cmap='viridis',\n", " tools=['hover'],\n", " xlim=(0.15,.6),\n", " ylim=(0.15,.6),\n", " clipping_colors={'min': 'transparent'},\n", " xlabel='HLS NDVI',\n", " ylabel='Predicted NDVI',\n", " logz=True)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Interpreting the Model\n", "\n", "**Use the [SHAP (SHapley Additive exPlanations) package](https://github.com/slundberg/shap)** to interpret the model results and better understand the features \"driving\" ndvi dynamics.\n", "\n", "SHAP Papers: https://www.nature.com/articles/s42256-019-0138-9 and http://papers.nips.cc/paper/7062-a-unified-approach-to-interpreting-model-predictions\n", "\n", "SHAP Blog: https://towardsdatascience.com/interpretable-machine-learning-with-xgboost-9ec80d148d27" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#Standard approaches can render different results\n", "#Show the top 20 most import features as defined by the XGBoost model\n", "xgb.plot_importance(reg_b['booster'],max_num_features=20,importance_type='weight')\n", "xgb.plot_importance(reg_b['booster'],max_num_features=20,importance_type='gain')\n", "xgb.plot_importance(reg_b['booster'],max_num_features=20,importance_type='cover')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#Import the SHAP libraries\n", "import shap\n", "import matplotlib.pyplot as plt\n", "shap.initjs()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#Split data into better manageable slices\n", "X_shap, _= train_test_split(X_test,test_size=0.95,shuffle=True)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**Apply SHAP Model:** Below we split the data by month, and examine the effect of the features on the model (by month)." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#Day of Year for each month\n", "months = {'May':[121,152],\n", " 'June':[153,182],\n", " 'July':[183,2013],\n", " 'August':[214,244],\n", " 'September':[245,274]}\n", "\n", "#Function for calculating SHAP values. We will map this function across the data on the cluster\n", "def calc_shap_vals(block,explainer):\n", " if len(block)>0:\n", " block_vals = explainer.shap_values(block)\n", " return(block_vals)\n", " else:\n", " return(np.empty((0,184)))\n", "\n", "#Loop over each month and create plot\n", "explainer = shap.TreeExplainer(reg_b['booster'])\n", "for k in months.keys():\n", " print(k)\n", " start = months[k][0]\n", " end = months[k][1]\n", " #Select only the data in the month\n", " X_shap1 = X_shap[(X_shap.DOY>=start)&(X_shap.DOY<=end)].repartition(npartitions=9).persist()\n", " wait(X_shap1)\n", " #Compute the SHAP values\n", " shap_vals = X_shap1.to_dask_array(lengths=True).map_blocks(calc_shap_vals,explainer=explainer,dtype='float32').compute()\n", " #Show the SHAP summary plots for each month\n", " print('Using an array of size:' +str(shap_vals.shape))\n", " plt.title(k)\n", " shap.summary_plot(shap_vals, X_shap1.compute(),max_display=20,title=k)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "shap_vals = X_shap.to_dask_array(lengths=True).map_blocks(calc_shap_vals,explainer=explainer,dtype='float32').compute()\n", "shap_vals = shap_vals[~np.isnan(shap_vals).any(axis=1)]\n", "shap.dependence_plot(\"week0_tmean\", shap_vals, X_shap.compute(),interaction_index='DOY')\n", "shap.dependence_plot(\"week0_ppt\", shap_vals, X_shap.compute(),interaction_index='DOY')\n", "shap.dependence_plot(\"week4_vpdmax\", shap_vals, X_shap.compute(),interaction_index='DOY')" ] } ], "metadata": { "kernelspec": { "display_name": "Python [conda env:py_geo]", "language": "python", "name": "conda-env-py_geo-py" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.6" } }, "nbformat": 4, "nbformat_minor": 4 }