{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "## [03_Big_Data.ipynb](https://github.com/raybellwaves/xskillscore-tutorial/blob/master/03_Big_Data.ipynb)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In this notebook I verify 12 million forecasts in a couple of seconds using the RMSE metric on a `dask.array`." ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "import xarray as xr\n", "import pandas as pd\n", "import numpy as np\n", "import xskillscore as xs\n", "import dask.array as da\n", "from dask.distributed import Client" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "By default the [`dask.distributed.Client`](https://distributed.dask.org/en/latest/client.html) uses a [`LocalCluster`](https://docs.dask.org/en/latest/setup/single-distributed.html#localcluster)\n", "\n", "```\n", "cluster = LocalCluster()\n", "client = Client(cluster)\n", "```\n", "\n", "However, this code can easily be adapted to scale on massive datasets using distributed computing via various methods of deployment:\n", "\n", " - [Kubernetes](https://docs.dask.org/en/latest/setup/kubernetes.html)\n", " - [High Performance Computers](https://docs.dask.org/en/latest/setup/hpc.html)\n", " - [YARN](https://yarn.dask.org/en/latest/)\n", " - [AWS Fargate](https://aws.amazon.com/fargate/)\n", " \n", "or vendor products:\n", " \n", " - [SaturnCloud](https://www.saturncloud.io/s/)\n", " \n", "If anyone does run this example on a large cluster I would be curious how big you can scale `nstores` and `nskus` and how long it takes to run `rmse`. You are welcome to post it in the issue section following this [link](https://github.com/raybellwaves/xskillscore-tutorial/issues/new/choose)." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Setup the client (i.e. connect to the scheduler):" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "data": { "text/html": [ "\n", "\n", "\n", "\n", "\n", "
\n", "

Client

\n", "\n", "
\n", "

Cluster

\n", "
    \n", "
  • Workers: 3
  • \n", "
  • Cores: 6
  • \n", "
  • Memory: 16.73 GB
  • \n", "
\n", "
" ], "text/plain": [ "" ] }, "execution_count": 2, "metadata": {}, "output_type": "execute_result" } ], "source": [ "client = Client()\n", "client" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Due to the success of your previous forecast (and verification using xskillscore!) the company you work for has expanded. They have grown to 4,000 stores each with 3,000 products:" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "That's 12,000,000 different forecasts to verify!\n" ] } ], "source": [ "nstores = 4000\n", "nskus = 3000\n", "\n", "nforecasts = nstores * nskus\n", "print(f\"That's {nforecasts:,d} different forecasts to verify!\")\n", "\n", "stores = np.arange(nstores)\n", "skus = np.arange(nskus)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The time period of interest is the same dates but for 2021:" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "dates = pd.date_range(\"1/1/2021\", \"1/5/2021\", freq=\"D\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Setup the data as a `dask.array` of dates x stores x skus.\n", "\n", "`dask` uses similar functions as `numpy`. In this case switch the `np.` to `da.` to generate random numbers between 1 and 10:" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "data": { "text/html": [ "\n", "\n", "\n", "\n", "\n", "
\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
Array Chunk
Bytes 480.00 MB 60.00 MB
Shape (5, 4000, 3000) (5, 1000, 1500)
Count 16 Tasks 8 Chunks
Type int64 numpy.ndarray
\n", "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", " \n", " \n", "\n", " \n", " \n", "\n", " \n", " \n", " \n", "\n", " \n", " \n", " \n", " \n", "\n", " \n", " \n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", " \n", " \n", " \n", "\n", " \n", " \n", "\n", " \n", " 3000\n", " 4000\n", " 5\n", "\n", "
" ], "text/plain": [ "dask.array" ] }, "execution_count": 5, "metadata": {}, "output_type": "execute_result" } ], "source": [ "data = da.random.randint(9, size=(len(dates), len(stores), len(skus))) + 1\n", "data" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Put this into an `xarray.DataArray` and specify the Coordinates and dimensions:" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", "Show/Hide data repr\n", "\n", "\n", "\n", "\n", "\n", "Show/Hide attributes\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "
xarray.DataArray
'add-7f8918891a510f68ee4b513f895573f3'
  • DATE: 5
  • STORE: 4000
  • SKU: 3000
  • dask.array<chunksize=(5, 1000, 1500), meta=np.ndarray>
    \n",
           "\n",
           "\n",
           "\n",
           "\n",
           "
    \n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
    Array Chunk
    Bytes 480.00 MB 60.00 MB
    Shape (5, 4000, 3000) (5, 1000, 1500)
    Count 16 Tasks 8 Chunks
    Type int64 numpy.ndarray
    \n", "
    \n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", " \n", " \n", "\n", " \n", " \n", "\n", " \n", " \n", " \n", "\n", " \n", " \n", " \n", " \n", "\n", " \n", " \n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", " \n", " \n", " \n", "\n", " \n", " \n", "\n", " \n", " 3000\n", " 4000\n", " 5\n", "\n", "
    • DATE
      (DATE)
      datetime64[ns]
      2021-01-01 ... 2021-01-05
      array(['2021-01-01T00:00:00.000000000', '2021-01-02T00:00:00.000000000',\n",
             "       '2021-01-03T00:00:00.000000000', '2021-01-04T00:00:00.000000000',\n",
             "       '2021-01-05T00:00:00.000000000'], dtype='datetime64[ns]')
    • STORE
      (STORE)
      int64
      0 1 2 3 4 ... 3996 3997 3998 3999
      array([   0,    1,    2, ..., 3997, 3998, 3999])
    • SKU
      (SKU)
      int64
      0 1 2 3 4 ... 2996 2997 2998 2999
      array([   0,    1,    2, ..., 2997, 2998, 2999])
" ], "text/plain": [ "\n", "dask.array\n", "Coordinates:\n", " * DATE (DATE) datetime64[ns] 2021-01-01 2021-01-02 ... 2021-01-05\n", " * STORE (STORE) int64 0 1 2 3 4 5 6 ... 3993 3994 3995 3996 3997 3998 3999\n", " * SKU (SKU) int64 0 1 2 3 4 5 6 7 ... 2993 2994 2995 2996 2997 2998 2999" ] }, "execution_count": 6, "metadata": {}, "output_type": "execute_result" } ], "source": [ "y = xr.DataArray(data, coords=[dates, stores, skus], dims=[\"DATE\", \"STORE\", \"SKU\"])\n", "y" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Create a prediction array similar to that in [01_Deterministic.ipynb](https://github.com/raybellwaves/xskillscore-tutorial/blob/master/01_Determinisitic.ipynb):" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", "Show/Hide data repr\n", "\n", "\n", "\n", "\n", "\n", "Show/Hide attributes\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "
xarray.DataArray
  • DATE: 5
  • STORE: 4000
  • SKU: 3000
  • dask.array<chunksize=(5, 1000, 1500), meta=np.ndarray>
    \n",
           "\n",
           "\n",
           "\n",
           "\n",
           "
    \n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
    Array Chunk
    Bytes 480.00 MB 60.00 MB
    Shape (5, 4000, 3000) (5, 1000, 1500)
    Count 40 Tasks 8 Chunks
    Type float64 numpy.ndarray
    \n", "
    \n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", " \n", " \n", "\n", " \n", " \n", "\n", " \n", " \n", " \n", "\n", " \n", " \n", " \n", " \n", "\n", " \n", " \n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", " \n", " \n", " \n", "\n", " \n", " \n", "\n", " \n", " 3000\n", " 4000\n", " 5\n", "\n", "
    • DATE
      (DATE)
      datetime64[ns]
      2021-01-01 ... 2021-01-05
      array(['2021-01-01T00:00:00.000000000', '2021-01-02T00:00:00.000000000',\n",
             "       '2021-01-03T00:00:00.000000000', '2021-01-04T00:00:00.000000000',\n",
             "       '2021-01-05T00:00:00.000000000'], dtype='datetime64[ns]')
    • STORE
      (STORE)
      int64
      0 1 2 3 4 ... 3996 3997 3998 3999
      array([   0,    1,    2, ..., 3997, 3998, 3999])
    • SKU
      (SKU)
      int64
      0 1 2 3 4 ... 2996 2997 2998 2999
      array([   0,    1,    2, ..., 2997, 2998, 2999])
" ], "text/plain": [ "\n", "dask.array\n", "Coordinates:\n", " * DATE (DATE) datetime64[ns] 2021-01-01 2021-01-02 ... 2021-01-05\n", " * STORE (STORE) int64 0 1 2 3 4 5 6 ... 3993 3994 3995 3996 3997 3998 3999\n", " * SKU (SKU) int64 0 1 2 3 4 5 6 7 ... 2993 2994 2995 2996 2997 2998 2999" ] }, "execution_count": 7, "metadata": {}, "output_type": "execute_result" } ], "source": [ "noise = da.random.uniform(-1, 1, size=(len(dates), len(stores), len(skus)))\n", "yhat = y + (y * noise)\n", "yhat" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Finally calculate RMSE at the store and sku level.\n", "\n", "Use the [`.compute()`](https://distributed.dask.org/en/latest/manage-computation.html) method to return the values:" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 162 ms, sys: 119 ms, total: 281 ms\n", "Wall time: 2.08 s\n" ] }, { "data": { "text/html": [ "
\n", "\n", "\n", "Show/Hide data repr\n", "\n", "\n", "\n", "\n", "\n", "Show/Hide attributes\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "
xarray.DataArray
  • STORE: 4000
  • SKU: 3000
  • 3.417 3.234 2.697 3.372 5.377 3.665 ... 1.19 3.425 4.259 2.542 4.752
    array([[3.41682887, 3.23411861, 2.69707474, ..., 2.87026685, 3.68759293,\n",
           "        2.6408434 ],\n",
           "       [3.16087021, 2.17575623, 4.33849841, ..., 2.24438267, 5.54621049,\n",
           "        2.04912144],\n",
           "       [1.81492691, 2.04205526, 1.63489789, ..., 4.26397024, 1.33314852,\n",
           "        4.22996336],\n",
           "       ...,\n",
           "       [5.12907457, 0.64156566, 3.15916421, ..., 3.2282438 , 5.43960656,\n",
           "        3.9416223 ],\n",
           "       [3.84933788, 3.28570209, 2.29624814, ..., 1.48937493, 1.32878641,\n",
           "        2.65736995],\n",
           "       [0.5710644 , 1.32473384, 4.10346585, ..., 4.25863161, 2.54228724,\n",
           "        4.75186667]])
    • STORE
      (STORE)
      int64
      0 1 2 3 4 ... 3996 3997 3998 3999
      array([   0,    1,    2, ..., 3997, 3998, 3999])
    • SKU
      (SKU)
      int64
      0 1 2 3 4 ... 2996 2997 2998 2999
      array([   0,    1,    2, ..., 2997, 2998, 2999])
" ], "text/plain": [ "\n", "array([[3.41682887, 3.23411861, 2.69707474, ..., 2.87026685, 3.68759293,\n", " 2.6408434 ],\n", " [3.16087021, 2.17575623, 4.33849841, ..., 2.24438267, 5.54621049,\n", " 2.04912144],\n", " [1.81492691, 2.04205526, 1.63489789, ..., 4.26397024, 1.33314852,\n", " 4.22996336],\n", " ...,\n", " [5.12907457, 0.64156566, 3.15916421, ..., 3.2282438 , 5.43960656,\n", " 3.9416223 ],\n", " [3.84933788, 3.28570209, 2.29624814, ..., 1.48937493, 1.32878641,\n", " 2.65736995],\n", " [0.5710644 , 1.32473384, 4.10346585, ..., 4.25863161, 2.54228724,\n", " 4.75186667]])\n", "Coordinates:\n", " * STORE (STORE) int64 0 1 2 3 4 5 6 ... 3993 3994 3995 3996 3997 3998 3999\n", " * SKU (SKU) int64 0 1 2 3 4 5 6 7 ... 2993 2994 2995 2996 2997 2998 2999" ] }, "execution_count": 8, "metadata": {}, "output_type": "execute_result" } ], "source": [ "%time xs.rmse(y, yhat, 'DATE').compute()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.2" } }, "nbformat": 4, "nbformat_minor": 4 }