Raad2: Python Dask

From Research Computing @ TAMUQ
Jump to navigation Jump to search


What is Dask?

Dask enables scaling of the Python Packages over several nodes.It works with the existing Python ecosystem to scale it to multi-core machines and distributed clusters. [1]

Prepare enivornment

Compute nodes on raad2 doesn't have local storage. You can setup a TMPDIR variable which points to a tmp dir in your raad2 home dir. If you miss this step, you will get errors related to "/tmp" when you launch dask workers.

mkdir -p ~/tmp
echo 'export TMPDIR=~/tmp' >> ~/.bashrc
source ~/.bashrc

Test/Dev in Interactive Mode

Submit an interactive job

An interactive job allows you to test or develop your code on a compute node. Running test codes on the shared login node can effect the performance of a login node. It is highly recommended to run your test computation on a compute node by submitting an interactive job.

muarif092@raad2a:~> sinteractive
muarif092@nid00388:~>

Create a Conda Dask Environment

If you already have a Conda environment for your project and you plan to use that for the Dask project, below steps can be skipped. Otherwise, create a Conda virtual environment named "daskproj".

muarif092@nid00388:~> source /lustre/sw/xc40ac/anaconda3/etc/profile.d/conda.sh
muarif092@nid00388:~> conda create --name daskproj

Install Dask Packages

muarif092@nid00388:~> conda activate daskproj
(daskproj) muarif092@nid00388:~> conda install dask ipython
(daskproj) muarif092@nid00388:~> conda install -c conda-forge dask-jobqueue

Run Test Dask Code

Launch Python

You can use ipython or simple python.

(daskproj) muarif092@nid00388:~> ipython
Python 3.7.2 (default, Dec 29 2018, 06:19:36)
Type 'copyright', 'credits' or 'license' for more information
IPython 7.3.0 -- An enhanced Interactive Python. Type '?' for help.
In [1]:

Import SLURMCluster

From Dask Job Queue library, import SLURMCluster

from dask_jobqueue import SLURMCluster

Create Cluster Object

We will use this in next step to spawn workers with resources specified in 'cluster' object. Here we are creating cluster object with Cores=2, Memory=10GB and walltime=30minutes

cluster = SLURMCluster( queue='s_debug', cores=2, memory='10GB',interface='ipogif0', walltime='00:30:00', job_extra=['--qos=sd','--hint=nomultithread','--gres=craynetwork:0', '-o dask_job.%j.%N.out','-e dask_job.%j.%N.error'])

At this time, if you print cluster object, all resources are listed as 0.

print(cluster)
SLURMCluster(cores=0, memory=0 B, workers=0/0, jobs=0/0)

Scale the cluster

This step will launch '4' workers. Each worker will have resources defined in 'cluster' object. You can scale it to a larger number if required.

cluster.scale(4)

In a different ssh session to raad2, you can verify that if requested dask-workers have been launched.

muarif092@raad2a:~> squeue -u muarif092
   JOBID     USER ACCOUNT           NAME  ST REASON    START_TIME                TIME  TIME_LEFT NODES CPUS
 4192018 muarif09 default    dask-worker   R None      2019-03-21T12:39:31       0:07      29:53     1    4
 4192019 muarif09 default    dask-worker   R None      2019-03-21T12:39:31       0:07      29:53     1    4
 4192020 muarif09 default    dask-worker   R None      2019-03-21T12:39:31       0:07      29:53     1    4
 4192021 muarif09 default    dask-worker   R None      2019-03-21T12:39:31       0:07      29:53     1    4

Now if you print the cluster object, you will see available resources have increased.

print(cluster)
SLURMCluster(cores=8, memory=40.00 GB, workers=4/4, jobs=4/4)

Connect client to cluster

from distributed import Client
client = Client(cluster)
print(client)

<Client: scheduler='tcp://10.128.1.135:39839' processes=4 cores=8>

Do a simple computation to verify parallelism

Define a function 'slow_increment', which takes a number and return that number + 1

import time
def slow_increment(x):
    time.sleep(1)
    return x + 1

futures = client.map(slow_increment, range(1000))

You can check the progress of your computation with "progress" module.

from dask.distributed import progress
progress(futures)

[##                                      ] | 7% Completed |  5.8s

Disconnect client

client.close()

Shutdown Dask Cluster

cluster.close()

Submit Python Dask code in Batch Job

Interactive jobs are useful if you want to test and develop your code. For large computation which you want to run on a shared resource, its more optimum to submit your computation in a batch mode.

Create a Dask jobqueue.yaml file

This file should be located under your home directory at "~/.config/dask" and it defines the properties of the cluster object.
Create or modify 'jobqueue.yaml'.

muarif092@raad2a:~> mkdir -p ~/.config/dask
muarif092@raad2a:~> cd ~/.config/dask
muarif092@raad2a:~> touch jobqueue.yaml

Edit ~/.config/dask/jobqueue.yaml and add below contents

jobqueue:
   slurm:
     name: dask-worker
     # Dask worker options
     cores: 24                   # Total number of cores per job
     memory: 120 GB              # Total amount of memory per job
     processes: 1                # Number of Python processes per job
     interface: ipogif0          # Network interface to use like eth0 or ib0
     death-timeout: 180          # Number of seconds to wait if a worker can not find a scheduler
     local-directory: null       # Location of fast local storage like /scratch or $TMPDIR
     # SLURM resource manager options
     queue: l_long
     walltime: '00:24:00'        # 24-Hours walltime. In l_long this can go upto 168:00:00 i.e. 1 week
     job-extra: {'--qos=ll','-o dask_job.%j.%N.out','-e dask_job.%j.%N.error'}

Write your Python Dask Code

Sample Code [2]

# This sample code is obtained and mofified from: https://github.com/dask/dask-jobqueue/issues/206

import time
import numpy as np
from distributed import Client
from dask_jobqueue import SLURMCluster
from socket import gethostname

def slow_increment(x):
    m = np.random.random((100000, 10000))
    time.sleep(1)
    return [x + 1, gethostname(), time.time()]

# Cluster configuration is defined in ~/.config/dask/jobqueue.yaml
cluster = SLURMCluster()

# Scale cluster to 02 nodes
nodes = 2
cluster.scale(nodes)

# Wait for cluster to start
time.sleep(30)

# Connect client to the cluster
client = Client(cluster)

# Launch computation
futures = client.map(slow_increment, range(10))

# Check progress
from dask.distributed import progress
progress(futures)

# Gather results
result = client.gather(futures)

print('\n::: start user output :::')
print(client)
print('::: end user output :::\n')

# Print results
for record in result:
    print(record)

client.close()
cluster.close()

Create and submit Slurm Batch Job file

Create a SLURM job file "dask_launcher.job".


#!/bin/bash
#SBATCH -J dask_launcher
#SBATCH -p s_long
#SBATCH --qos sl
#SBATCH --time=168:00:00
#SBATCH --ntasks=1
#SBATCH --cpus-per-task=1
#SBATCH --output=dask_launcher.o%j
#SBATCH --error=dask_launcher.e%j
#SBATCH --hint=nomultithread
#SBATCH --gres=craynetwork:0

source /lustre/sw/xc40ac/anaconda3/etc/profile.d/conda.sh

conda activate daskproj

python dask_sample.py


Submit the batch job

sbatch dask_launcher.job

Check the progress of your code

tail -f dask_launcher.o<jid>

Notes