# Dask-jobqueue in Action

In [1]:
from dask_jobqueue import PBSCluster

import dask

dask.config.set(
    {
        'distributed.dashboard.link': 'https://jupyterhub.hpc.ucar.edu/stable/user/{USER}/dav-compute/proxy/{port}/status'
    }
)

<dask.config.set at 0x2b7e89a7a520>

## Instatiate a cluster object

In [2]:
cluster = PBSCluster(
    cores=1,
    processes=1,
    memory="10GB",
    queue="casper",
    project="NTDD0005",
    walltime="00:30:00",
    resource_spec="select=1:ncpus=1:mem=10GB:ngpus=1",
    extra=[
        '--resources GPU=1'
    ],  # tag dask-workers: # specify special hardware availability that the scheduler is not aware of
    job_extra=['-l gpu_type=v100'],
    env_extra=['module load cuda/11.0.3'],  # ensure cuda is loaded
)

cluster

VBox(children=(HTML(value='<h2>PBSCluster</h2>'), HBox(children=(HTML(value='\n<div>\n  <style scoped>\n    .dâ€¦

## Batch Job Script

In [3]:
print(cluster.job_script())

#!/usr/bin/env bash

#PBS -N dask-worker
#PBS -q casper
#PBS -A NTDD0005
#PBS -l select=1:ncpus=1:mem=10GB:ngpus=1
#PBS -l walltime=00:30:00
#PBS -e /glade/scratch/abanihi/
#PBS -o /glade/scratch/abanihi/
#PBS -l gpu_type=v100
module load cuda/11.0.3
/glade/work/abanihi/opt/miniconda/envs/dask-gpu/bin/python -m distributed.cli.dask_worker tcp://10.12.205.17:41700 --nthreads 1 --memory-limit 9.31GiB --name dummy-name --nanny --death-timeout 60 --local-directory /glade/scratch/abanihi --resources GPU=1 --interface ib0 --protocol tcp://



In [4]:
!qstat -u abanihi

                                                            Req'd  Req'd   Elap
Job ID          Username Queue    Jobname    SessID NDS TSK Memory Time  S Time
--------------- -------- -------- ---------- ------ --- --- ------ ----- - -----
240546.casper-* abanihi  jhublog* STDIN      217212   1   1    4gb 720:0 R 226:0
251037.casper-* abanihi  jhublog* STDIN      238092   1   1    4gb 720:0 R 149:3
282298.casper-* abanihi  tdd      STDIN       10906   1   1   10gb 00:30 R 00:01
282301.casper-* abanihi  tdd      dask-work*  31679   1   1   10gb 00:30 R 00:00
282302.casper-* abanihi  tdd      dask-work*    --    1   1   10gb 00:30 R   -- 


## Cluster Scaling APIs

In [5]:
cluster.scale(2)

In [6]:
cluster.scale(jobs=2)

In [7]:
cluster.adapt(minimum=2, maximum=4)

<distributed.deploy.adaptive.Adaptive at 0x2b7e8985eac0>

In [8]:
cluster.adapt(minimum_jobs=2, maximum_jobs=4)

<distributed.deploy.adaptive.Adaptive at 0x2b7eacd3f400>

## Connect created cluster to a client

In [9]:
from distributed import Client

client = Client(cluster)

## Confirm that our dask workers have access to GPUs

In [10]:
def get_nvidia_smi_info():
    import subprocess

    p = subprocess.check_output('nvidia-smi').strip().decode('utf-8')
    return p


def nvidia_smi(on='workers'):
    if on == 'workers':
        x = client.run(get_nvidia_smi_info)
        print(" ***** NVIDIA-SMI info on Workers *****")
        for key, value in x.items():
            print("*" * 80)
            print(key)
            print(value, end="\n\n")

    elif on == 'scheduler':
        print("***** NVIDIA-SMI info on Scheduler *****")
        print(client.run_on_scheduler(get_nvidia_smi_info))

In [11]:
nvidia_smi()

 ***** NVIDIA-SMI info on Workers *****
********************************************************************************
tcp://10.12.205.38:40174
Wed May 19 22:18:08 2021       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 450.51.06    Driver Version: 450.51.06    CUDA Version: 11.0     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|                               |                      |               MIG M. |
|   0  Tesla V100-SXM2...  On   | 00000000:B2:00.0 Off |                    0 |
| N/A   31C    P0    40W / 300W |      0MiB / 32510MiB |      0%      Default |
|                               |                      |                  N/A |
+-------------------------------+----------------------+----------------------+
                      

In [12]:
nvidia_smi(on='scheduler')

***** NVIDIA-SMI info on Scheduler *****
Wed May 19 22:18:09 2021       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 450.51.06    Driver Version: 450.51.06    CUDA Version: 11.0     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|                               |                      |               MIG M. |
|   0  Tesla V100-SXM2...  On   | 00000000:B3:00.0 Off |                    0 |
| N/A   28C    P0    40W / 300W |      0MiB / 32510MiB |      0%      Default |
|                               |                      |                  N/A |
+-------------------------------+----------------------+----------------------+
                                                                               
+----------------------------------------------

## Run some computation

In [13]:
import cupy

import dask.array as da

In [21]:
# generate chunked dask arrays of many cupy random arrays
rs = da.random.RandomState(RandomState=cupy.random.RandomState)  # <-- we specify cupy here
x = rs.normal(10, 1, size=(200000, 200000), chunks=(10000, 4000), dtype=cupy.float32)
x

Unnamed: 0,Array,Chunk
Bytes,149.01 GiB,152.59 MiB
Shape,"(200000, 200000)","(10000, 4000)"
Count,1000 Tasks,1000 Chunks
Type,float32,cupy.ndarray
"Array Chunk Bytes 149.01 GiB 152.59 MiB Shape (200000, 200000) (10000, 4000) Count 1000 Tasks 1000 Chunks Type float32 cupy.ndarray",200000  200000,

Unnamed: 0,Array,Chunk
Bytes,149.01 GiB,152.59 MiB
Shape,"(200000, 200000)","(10000, 4000)"
Count,1000 Tasks,1000 Chunks
Type,float32,cupy.ndarray


In [22]:
y = (x + 1)[::2, ::2].std(axis=0)
y = y.persist()
y

Unnamed: 0,Array,Chunk
Bytes,390.62 kiB,7.81 kiB
Shape,"(100000,)","(2000,)"
Count,50 Tasks,50 Chunks
Type,float32,cupy.ndarray
"Array Chunk Bytes 390.62 kiB 7.81 kiB Shape (100000,) (2000,) Count 50 Tasks 50 Chunks Type float32 cupy.ndarray",100000  1,

Unnamed: 0,Array,Chunk
Bytes,390.62 kiB,7.81 kiB
Shape,"(100000,)","(2000,)"
Count,50 Tasks,50 Chunks
Type,float32,cupy.ndarray


In [23]:
%%time
result = y.compute()

CPU times: user 30 ms, sys: 2 ms, total: 32 ms
Wall time: 36.7 ms


In [24]:
result

array([0.9968655 , 0.99782085, 1.0016662 , ..., 1.0015868 , 0.9996461 ,
       0.9991475 ], dtype=float32)

In [25]:
type(result)

cupy._core.core.ndarray

In [26]:
nvidia_smi()

 ***** NVIDIA-SMI info on Workers *****
********************************************************************************
tcp://10.12.205.38:40174
Wed May 19 22:23:17 2021       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 450.51.06    Driver Version: 450.51.06    CUDA Version: 11.0     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|                               |                      |               MIG M. |
|   0  Tesla V100-SXM2...  On   | 00000000:B2:00.0 Off |                    0 |
| N/A   32C    P0    52W / 300W |    926MiB / 32510MiB |      0%      Default |
|                               |                      |                  N/A |
+-------------------------------+----------------------+----------------------+
                      