Raad2: Python Dask
What is Dask?
Dask enables scaling of the Python Packages over several nodes.It works with the existing Python ecosystem to scale it to multi-core machines and distributed clusters. [1]
Prepare enivornment
Compute nodes on raad2 doesn't have local storage. You can setup a TMPDIR variable which points to a tmp dir in your raad2 home dir. If you miss this step, you will get errors related to "/tmp" when you launch dask workers.
mkdir -p ~/tmp
echo 'export TMPDIR=~/tmp' >> ~/.bashrc
source ~/.bashrc
Test/Dev in Interactive Mode
Submit an interactive job
An interactive job allows you to test or develop your code on a compute node. Running test codes on the shared login node can effect the performance of a login node. It is highly recommended to run your test computation on a compute node by submitting an interactive job.
muarif092@raad2a:~> sinteractive
muarif092@nid00388:~>
Create a Conda Dask Environment
If you already have a Conda environment for your project and you plan to use that for the Dask project, below steps can be skipped. Otherwise, create a Conda virtual environment named "daskproj".
muarif092@nid00388:~> source /lustre/sw/xc40ac/anaconda3/etc/profile.d/conda.sh
muarif092@nid00388:~> conda create --name daskproj
Install Dask Packages
muarif092@nid00388:~> conda activate daskproj
(daskproj) muarif092@nid00388:~> conda install dask ipython
(daskproj) muarif092@nid00388:~> conda install -c conda-forge dask-jobqueue
Run Test Dask Code
Launch Python
You can use ipython or simple python.
(daskproj) muarif092@nid00388:~> ipython
Python 3.7.2 (default, Dec 29 2018, 06:19:36)
Type 'copyright', 'credits' or 'license' for more information
IPython 7.3.0 -- An enhanced Interactive Python. Type '?' for help.
In [1]:
Import SLURMCluster
From Dask Job Queue library, import SLURMCluster
from dask_jobqueue import SLURMCluster
Create Cluster Object
We will use this in next step to spawn workers with resources specified in 'cluster' object. Here we are creating cluster object with Cores=2, Memory=10GB and walltime=30minutes
cluster = SLURMCluster( queue='s_debug', cores=2, memory='10GB',interface='ipogif0', walltime='00:30:00', job_extra=['--qos=sd','--hint=nomultithread','--gres=craynetwork:0', '-o dask_job.%j.%N.out','-e dask_job.%j.%N.error'])
At this time, if you print cluster object, all resources are listed as 0.
print(cluster)
SLURMCluster(cores=0, memory=0 B, workers=0/0, jobs=0/0)
Scale the cluster
This step will launch '4' workers. Each worker will have resources defined in 'cluster' object. You can scale it to a larger number if required.
cluster.scale(4)
In a different ssh session to raad2, you can verify that if requested dask-workers have been launched.
muarif092@raad2a:~> squeue -u muarif092
JOBID USER ACCOUNT NAME ST REASON START_TIME TIME TIME_LEFT NODES CPUS
4192018 muarif09 default dask-worker R None 2019-03-21T12:39:31 0:07 29:53 1 4
4192019 muarif09 default dask-worker R None 2019-03-21T12:39:31 0:07 29:53 1 4
4192020 muarif09 default dask-worker R None 2019-03-21T12:39:31 0:07 29:53 1 4
4192021 muarif09 default dask-worker R None 2019-03-21T12:39:31 0:07 29:53 1 4
Now if you print the cluster object, you will see available resources have increased.
print(cluster)
SLURMCluster(cores=8, memory=40.00 GB, workers=4/4, jobs=4/4)
Connect client to cluster
from distributed import Client
client = Client(cluster)
print(client)
<Client: scheduler='tcp://10.128.1.135:39839' processes=4 cores=8>
Do a simple computation to verify parallelism
Define a function 'slow_increment', which takes a number and return that number + 1
import time
def slow_increment(x):
time.sleep(1)
return x + 1
futures = client.map(slow_increment, range(1000))
You can check the progress of your computation with "progress" module.
from dask.distributed import progress
progress(futures)
[## ] | 7% Completed | 5.8s
Disconnect client
client.close()
Shutdown Dask Cluster
cluster.close()
Submit Python Dask code in Batch Job
Interactive jobs are useful if you want to test and develop your code. For large computation which you want to run on a shared resource, its more optimum to submit your computation in a batch mode.
Create a Dask jobqueue.yaml file
This file should be located under your home directory at "~/.config/dask" and it defines the properties of the cluster object.
Create or modify 'jobqueue.yaml'.
muarif092@raad2a:~> mkdir -p ~/.config/dask
muarif092@raad2a:~> cd ~/.config/dask
muarif092@raad2a:~> touch jobqueue.yaml
Edit ~/.config/dask/jobqueue.yaml and add below contents
jobqueue:
slurm:
name: dask-worker
# Dask worker options
cores: 24 # Total number of cores per job
memory: 120 GB # Total amount of memory per job
processes: 1 # Number of Python processes per job
interface: ipogif0 # Network interface to use like eth0 or ib0
death-timeout: 180 # Number of seconds to wait if a worker can not find a scheduler
local-directory: null # Location of fast local storage like /scratch or $TMPDIR
# SLURM resource manager options
queue: l_long
walltime: '00:24:00' # 24-Hours walltime. In l_long this can go upto 168:00:00 i.e. 1 week
job-extra: {'--qos=ll','-o dask_job.%j.%N.out','-e dask_job.%j.%N.error'}
Write your Python Dask Code
Sample Code [2]
# This sample code is obtained and mofified from: https://github.com/dask/dask-jobqueue/issues/206
import time
import numpy as np
from distributed import Client
from dask_jobqueue import SLURMCluster
from socket import gethostname
def slow_increment(x):
m = np.random.random((100000, 10000))
time.sleep(1)
return [x + 1, gethostname(), time.time()]
# Cluster configuration is defined in ~/.config/dask/jobqueue.yaml
cluster = SLURMCluster()
# Scale cluster to 02 nodes
nodes = 2
cluster.scale(nodes)
# Wait for cluster to start
time.sleep(30)
# Connect client to the cluster
client = Client(cluster)
# Launch computation
futures = client.map(slow_increment, range(10))
# Check progress
from dask.distributed import progress
progress(futures)
# Gather results
result = client.gather(futures)
print('\n::: start user output :::')
print(client)
print('::: end user output :::\n')
# Print results
for record in result:
print(record)
client.close()
cluster.close()
Create and submit Slurm Batch Job file
Create a SLURM job file "dask_launcher.job".
#!/bin/bash
#SBATCH -J dask_launcher
#SBATCH -p s_long
#SBATCH --qos sl
#SBATCH --time=168:00:00
#SBATCH --ntasks=1
#SBATCH --cpus-per-task=1
#SBATCH --output=dask_launcher.o%j
#SBATCH --error=dask_launcher.e%j
#SBATCH --hint=nomultithread
#SBATCH --gres=craynetwork:0
source /lustre/sw/xc40ac/anaconda3/etc/profile.d/conda.sh
conda activate daskproj
python dask_sample.py
Submit the batch job
sbatch dask_launcher.job
Check the progress of your code
tail -f dask_launcher.o<jid>