Distributed computing using Python/Dask on the CC-IN2P3 Grid Engine

aka Distribute native python code on cluster without pain!

Author: Clément Buton

Introduction

This page summarize my feedback after a month of use / exploration of the python Dask framework.

My goal was to validate the use of the Dask framework for the implementation of the Euclid/NISP infrared sensors characterisation pipeline on the CC-IN2P3 infrastructure.

Appart from a basic introduction to some various aspects of the libraries I used, I will provide a Python 3 artificial example of distributed computing specific to the CC-IN2P3.

Setup and requirements

Account and access rights

Obviously, if you want to run jobs on the CC-IN2P3 Grid Engine (GE) batch system, you first need an account there...

Prerequisite python libraries

In order to be able to distribute your code on the cluster, some third party libraries are required. These can be installed easily on a virtual environment with conda (cf subsection below).

  • bokeh (optional) a powerfull html based graphic library used for the real time monitoring interface (very useful).
  • dask obviously.
  • dask.distributed lightweight library for distributed computing in Python. It extends both the concurrent.futures and Dask APIs to moderate sized clusters.
  • dask.jobqueue allows to easily deploy Dask on job queuing systems like PBS, Slurm, MOAB, SGE, and LSF.
  • numpy the fundamental package for any scientific computing with python.
  • xarray (optional) Python package that makes working with labelled multi-dimensional arrays simple, efficient, and fun! Works painlessly with Dask/numpy/pandas.

Virtual environment with conda

Conda is an open source package management system and environment management system. It allows among other things to create with ease virtual environments in python.

I will not come back here on the usefullness of virtual environements. I you are not yet conviced, check out this website.

I strongly advise to use miniconda instead of the full conda distribution. Miniconda is much lighter and allows to install only the required libraries.

The following conda installation procedure is specific for a linux based distribution, although it should be roughly the same on most systems.

You will find below the instructions to get a simple python 3 virtual environment with all needed libraries to execute basic jobs using Dask on the CC-IN2P3 cluster:

>>> wget https://repo.continuum.io/miniconda/Miniconda3-latest-Linux-x86_64.sh -O ~/miniconda.sh
>>> bash ~/miniconda.sh -b -p $HOME/.miniconda
>>> echo 'export PATH=${HOME}/.miniconda/bin:${PATH}' >> ~/.bashrc
>>> echo '. $HOME/.miniconda/bin/activate' >> ~/.bashrc
>>> conda create -n py36 python=3.6 numpy dask distributed bokeh click graphviz python-graphviz xarray ipython[all] matplotlib[all]
>>> conda activate py36   # Activate your virtual environment
>>> pip install zarr dask-jobqueue  # zarr &dask-jobqueue are not yet available on the offical anaconda repository

Dask

Dask is a flexible library for parallel computing in Python. It is composed of two parts, a dynamic task scheduling optimized for computation and "Big Data" collections like parallel arrays, dataframes, and lists that extend common interfaces like NumPy, Pandas, or Python iterators to larger-than-memory or distributed environments. These parallel collections run on top of dynamic task schedulers.

A full Dask presentation from its main developer Matthew Rocklin is available online and can be viewed here.

The dask project documentation is complete and very well done. If you are seriously thinking to use Dask, I strongly recommend to get started and read it.

User interfaces

Their are two main approches of the user interface called high level and low level.

We will mainly discuss here of the high level Arrays which can be interpreted as out of core NumPy Arrays with task scheduling built on top of NumPy. You can visit the online user interface Dask documentation page for more information about the several other interfaces available (e.g. DataFrames built on top of Pandas or XArray that we will discuss later)

The Dask Arrays have an API very similar to the Numpy Arrays:

# Arrays
import dask.array as da
x = da.random.uniform(low=0, high=10, size=(10000, 10000),  # normal numpy code
                      chunks=(1000, 1000))  # break into chunks of size 1000x1000

y = x + x.T - x.mean(axis=0)  # Use normal syntax for high level algorithms

The important new information here is the chunks option which specify how you want to cut your data in small pieces to be distributed, and the fact that the computation is not performed on the spot, that is to say the data is not stored in memory at this point. It is called lazy computing.

Laziness and Computing

Most Dask user interfaces are lazy, meaning that they do not evaluate until you explicitly ask for a result using the compute method:

# This array syntax doesn't cause computation
y = x + x.T - x.mean(axis=0)

# Trigger computation by explicitly calling the compute method
y = y.compute()

If you have multiple results that you want to compute at the same time, use the dask.compute function. This can share intermediate results and so be more efficient:

# compute multiple results at the same time with the compute function
min, max = dask.compute(y.min(), y.max())

Note that the "compute()" function returns in-memory results. It converts Dask DataFrames to Pandas DataFrames, Dask arrays to NumPy arrays, and Dask bags to lists. You should only call compute on results that will fit comfortably in memory. If your result does not fit in memory, then you might consider writing it to disk instead.

# Write larger results out to disk rather than store them in memory
my_dask_dataframe.to_parquet('myfile.parquet')
my_dask_array.to_hdf5('myfile.hdf5')
my_dask_bag.to_textfiles('myfile.*.txt')

Dask on Cluster

You can perfectly use Dask localy on your laptop but at some point you might want to scale up to a full cluster.

In that case, you should use Dask.distributed which is a lightweight library for distributed computing in Python. It extends both the concurrent.futures and dask APIs to moderate sized clusters.

Although you could code it yourself using Dask.distributed ressources, I suggest to use third party libraries to deploy Dask on job queuing systems like PBS, Slurm, MOAB, SGE, and LSF.

Note

As mentionned, the CC-IN2P3 job queuing system (GE) is based on SGE

Cluster deployments

There are two libraries out there you can use for Cluster deployments: Dask Jobqueue and DRMAA. I tried both and strongly advise to use Dask Jobqueue which is more supported and stable. Luckily for us, the the Grid Engine (GE) batch system used at the CC-IN2P3 is fully implemented in Dask Jobqueue and can be used as it is.

SGECluster configuration

If you want to use the Grid Engine batch system at the computing center with Dask, you first need to understand how to use it "the normal way" (i.e. using commands like qsub, qstat, qdel, ...). Especially, you need to know about the computing ressources you will use/need and which queue is the most appropritate for your use.

Warning

According to my experience, the smoothness of the Dask usage at the computer center is extremely related to the configuration of the cluster. I strongly recommend that you spend some time to scale your computing needs properly prior to run any job.

All in all, you need to specify the following keyword arguments to the Cluster constuctor in order to make it work properly:

  • name: String

    Name of Dask workers. Important if you want to kill some jobs with the \'qdel\' command for example.

  • cores: Integer

    Number of cores associated to a single job. It does not mean the total amount of cores that you want for your full deployment. The number specified here will have a strong influence on the wait for a free spot according to the CC-IN2P3 rules.

  • processes: Integer

    By default Dask will run one Python process per job. However, you can optionally choose to cut up that job into multiple processes using the processes configuration value. This can be advantageous if your computations are bound by the GIL, but disadvantageous if you plan to communicate a lot between processes. Typically we find that for pure Numpy workloads a low number of processes (like one) is best, while for pure Python workloads a high number of processes (like one process per two cores) is best. If you are unsure then you might want to experiment a bit, or just choose a moderate number, like one process per four cores.

    Note

    So far I found that I obtained the most stability and robustness with a single process. Yet, I am using mostly vectorized numpy functions.

  • memory: String

    The amount of memory per job you must specify is directly linked to the queue you chose (Max virtual Memory).

  • interface: String

    Network interface. Probably best left alone, since it should be done automatically at the CC.

  • death_timeout: Integer

    Seconds to wait for a scheduler before closing workers.

  • queue: String

    You must specify the queue scaled for your computing and for which you have access rights (default queue is \'long\'). **The specified queue should be in accordance with memory values!**

  • project: string

    "P_[official name of your project at CC]"

  • walltime: Integer

    Walltime for each worker job.

  • resource_spec: String

    Request resources and specify job placement. Passed to "#\$ -l option". The 'h_rss' option (Max Resident Memory) should be specified here.

  • env_extra: List of strings

    Other commands to add to script before launching worker.

Warning

The Max Resident Memory can be a very restrictive parameter for processes that have high memory usage peaks.

Example

Here is a descriptive example of how the libraries work together.

from dask_jobqueue import SGECluster
cluster = SGECluster()    # This need to be properly configured
cluster.scale(10)         # Ask for ten workers

from dask.distributed import Client
client = Client(cluster)  # Connect this local process to remote workers

# wait for jobs to arrive, depending on the queue, this may take some time

import dask.array as da
x = ...                   # Dask commands now use these distributed resources

Dask scaling limits

It is not very clear to me up to what point can we scale using Dask. In the documentation it is said to be adapted to scale from single machine up to thousand-node clusters and 100+TB datasets when needed (with the same API).

As far as I am concerned, I am trying to wright a pipeline for modestly sized datasets of 50 to 300GB for a total amount of 0.5PB.

This post from Matthew Rocklin discuss the scaling limits of Dask and shows a nice video example of a simple computation using 50 36-core nodes on an artificial terabyte dataset.

Bokeh web interface

Dask.distributed includes a web responsive interface to help deliver this information over a normal web page in real time. On a local cluster, this web interface is launched by default wherever the scheduler is launched if the scheduler machine has Bokeh installed (do not miss the introduction video on the Dask.distributed documentation!)

This interface is extremely helpful not only in terms of monitoring but also in terms of debbuging and performance enhancements. In my humble opinon, it is a must have/use.

At the CC-IN2P3 it is a bit trickier to get the web interface running (basiccaly you need to use port forwarding). It will be demonstrated in the interactive_example section.

Zarr

Although the Dask developpers encourage to store and load data using Apache Parquet, they have (very) recently implemented a new interface towards the Zarr format which becomes a widely used format for storing distributed data. So far, I am using it for every intermediate data during the computation and I am very happy with it.

Xarray

xarray is a library which allows to label multi-dimensional arrays. It introduced dimensions, coordinates, and attributes on top of raw NumPy-like arrays. Xarray is able to understand these labels and use them to provide a more intuitive, more concise, and less error-prone experience. Xarray was inspired by and borrows heavily from the pandas API. In some way it replaces the deprecated pands.NDPanels.

xarray integrates with Dask to support parallel computations and streaming computation on datasets that don't fit into memory.

Currently, Dask is an entirely optional feature for xarray. However, the benefits of using Dask are sufficiently strong that Dask may become a required dependency in a future version of xarray.

Example

Here is a basic example of how to create a very simple 3D xarray.DataArray which could be compared to a labeled numpy.array.

import xarray as xr
data = xr.DataArray(np.random.randn(2, 2, 3),
            coords={'x': ['a', 'b'],
                    'y': range(2),
                    'z': range(6,9)},
            dims=('x', 'y', 'z'))
data

<xarray.DataArray (x: 2, y: 2, z: 3)>
array([[[-0.806286, -0.527686,  0.741435],
        [ 0.532388,  0.009704,  0.983884]],

       [[ 0.693561,  0.121124,  0.280036],
        [ 0.027191, -0.051463, -1.315291]]])
Coordinates:
  * x        (x) <U1 'a' 'b'
  * y        (y) int64 0 1
  * z        (z) int64 6 7 8

Interactive example at CC-IN2P3

Make sure you have a virtual environment set up with all libraries installed as specified in Virtual environment with conda on your account at the CC-IN2P3.

For this example we will connect to the interactive queue on a CC-IN2P3 machine in order to run the scheduler. For that, just type:

>>> qlogin

Then, enter your password and you should be connected shortly to a machine with a name starting with 'ccwige' followed by a four digits number hereafter called XXXX: [username]\@ccwigeXXXX .

In order to have the Bokeh web interface running at the same time, open another terminal window, connect once again to the CC-IN2P3 and start a web browser in the background from there, for example:

>>> firefox &

At this point you should see a remote firefox window opening on your desktop. Then, on the same terminal create a ssh tunnel to forward the port 8787 of ccwigeXXXX towards the port 8000 of the localhost, like this:

>>> ssh -L 8000:localhost:8787 [username]@ccwigeXXXX.in2p3.fr

Do not touch this terminal window again and go back to the one where you are connected to the interactive queue. Then, activate your virtual environment, create a 'tests' repository and start an IPython console:

>>> mkdir tests
>>> cd tests
>>> conda activate py36
>>> ipython

From there, import the needed libraries and setup your Cluster:

import numpy as np

from dask.distributed import Client
from dask_jobqueue import SGECluster
cluster = SGECluster(name= 'my-dask-worker', cores=8 ,memory='16GB', queue='long', ressource_spec='h_rss=4G')
cluster.scale(64) # Ask for 64 workers

What you just did is ask for 64 workers of 8 cores each configured for the \'long\' queue. Now start the distributed client like this:

client = Client(cluster, processes=False)

Now that the client is operational, you can go to the previously opened web browser and enter the adress of the dask web interface monitoring status page: http://localhost:8000/status

Go back to your IPython console, it is time to play with dask. As for now you should also keep an eye on the web browser as well.

import dask.array as da
data = da.random.uniform(low=0, high=10, size=(1000, 2000, 2000),  # normal numpy code
                         chunks=(1000, 64, 64))  # break into chunks of size 1000x64x64

You just created a random lazy 3D array of 32GB divided into 64MB chunks. It clearly does not fit in a single worker memory. What we want to do then is scatter the chunks across the cluster. For that we can use the asynchronous method 'persist()' as follows:

data = data.persist()

You should see now the real time evolution of the process on the status page where the workers are filling up with data chunks. After a little while your data set is in memory, distributed across the cluster.

Note

If you did not wait for all the workers to be up and ready you might witness some data 'rebalancing' during the process until the data is more or less homogeneously distributed among the workers.

Note

Do note use 'compute' as long as the array does not fit within the max virtual memory of a single worker (cf. doc). If you want to keep a result that does not fit in a single worker memory, you should write it on disk or ask with a queue with more memory.

To treat your data, you have access to the entire top level user functions of the dask.DataArray API. For example, we can compute the mean over the first axis as follows:

results = data.mean(axis=0).persist()

Note

As we already said the Dask API is very similar to numpy\'s and you have then access to a lot of methods to apply to 'data'

If you do not intend to use some \'heavy\' variables you can always delete them in order to reduce the memory footprint on the cluster.

del data

Tha resulting 2D array is small enough to be \'computed\' or it can be saved directly to disk. Let\'s finish this example by saving the reduced array into a \'zarr\' file.

from dask.distributed import fire_and_forget
fire_and_forget(results.to_zarr('./res.zarr'))

Note

You will notice that the ZARR format keep the chunking information.

When done, do not forget to close the client and cluster properly.

client.close()
cluster.close()

Note

I would like to insist on the importance of chunks selection. Bad chunking choice can have a great influence on the data transfer between workers (I/O) which is very costly in terms of performance.

Note

You might have noticed that most of the examples of dask and its web interface available on the web are using JuPyteR. JuPyteR lab is a very nice tool indeed and very well integrated with dask (cf. this nice example yet again from Matthew Rocklin). On a local cluster, it is probably a very good idea to test your workflow with such a tool. Nevertheless, at the CC-IN2P3 it is not that easy, even so you could propably use JuPyteR hub.

Using Xarray

The same kind of example can be performed using Xarray. Instead of using a dask.array, we use an xarray.DataArray which works very well together with dask. Let\'s assume the cluster and client are already initialized as done previously. You can transform your data using xarray like this:

import dask.array as da
data = da.random.uniform(low=0, high=10, size=(1000, 2000, 2000),  # normal numpy code
                         chunks=(1000, 64, 64))  # break into chunks of size 1000x64x64

import xarray as xr
data = xr.DataArray(data,
                    dims=('t', 'y', 'x'),
                    coords={'t': np.arange(1000, dtype=np.uint16),
                            'y': np.arange(2000, dtype=np.uint16),
                            'x': np.arange(2000, dtype=np.uint16)},
                    name='test')

The data can the be scattered through the cluster the same way:

data = data.persist()

Let\'s imagine that we do not want to use a top level user function of the dask.DataArray API, but perform instead a linear fit on the 'time' axis of the data cube. We can define a pure numpy function which will be apllied along each chunk using \'xr.apply_ufunc\' as follows:

def slopes(x, y):
     """.

     :param x: abscissa vector
     :param y: data cube
     :returns: slope matrix

     """

     y = np.moveaxis(y, 2, -3)

     return np.polynomial.polynomial.polyfit(
         x=x, y=y.reshape(y.shape[-3], y.shape[-2] * y.shape[-1]), deg=1)[-1]. \
         reshape(y.shape[-2], y.shape[-1])

 # Polynomial adjustment. Retrieve the linear coefficients map.
 futures = xr.apply_ufunc(slopes, data.t, data,
                          input_core_dims=[('t', ), ('t', )],
                          dask='parallelized',
                          keep_attrs=False,
                          output_dtypes=[np.float64]).persist()

 # Gather coefficients (slope) map from the cluster
 results = client.gather(futures)

 # DataArray do not have a zarr interface yet: switch to xarray.Dataset
 results = results.to_dataset(name='test')

 # Write results on disk
 from dask.distributed import fire_and_forget
 fire_and_forget(results.to_zarr('./res.zarr', mode='w', group='/slopes'))

Home made decorator to simplify things a bit

I wrote a very simple configurable decorator to speed up code writing a bit. Feel free to use it as you please.

Note

The default values specified in the following decorator correpsond to a generic \'long\' queue job. It must be adapted to your specific needs.

# Generic imports
import click

from dataclasses import dataclass, field
from functools import update_wrapper

# Dask imports
from dask.distributed import Client
from dask_jobqueue import SGECluster

__license__ = "2018, Clément Buton"
__author__ = 'Clément Buton <c.buton@ipnl.in2p3.fr>'
__date__ = '2019/01/28 12:33:41'

@dataclass
class on_SGECluster(object):
    """Decorator for running code on SGE cluster (i.e. CC-IN2P3).

    :param name: Job/Dask workers name
    :param cores: Number of cores
    :param processes: Number of processes
    :param memory: Amount of memory according to queue
    :param interface: Network interface
    :param death_timeout: Worker time out
    :param queue: CC-IN2P3 waiting queue
    :param project: Official CC project name starting with 'P_'
    :param walltime: Walltime for each worker job.
    :param resource_spec: Request resources and specify job placement (Passed to #$ -l option)
    :param kwargs: Additional keyword arguments to pass to LocalCluster
    :param env_extra: Other commands to add to script before launching worker
    :param min_workers: Minimum number of workers
    :param max_workers: Maximum number of workers
    :param adapt: If True, adapt automagically the number of workers between min and max

    """

    name: str = 'test-dask-worker'
    cores: int = 8
    # processes: int = 1
    memory: str = '10GB'
    # interface: str = 'em1'
    death_timeout: int = 60
    queue: str = 'long'
    project: str = 'test'
    walltime: str = '01:00:00'
    resource_spec: str = 'h_rss=4G'
    kwargs: dict = field(default=dict)
    env_extra: list = field(default_factory=list)
    min_workers: int = 16
    max_workers: int = 64
    adapt: bool = False

    def __call__(self, func, *args, **kwargs):
        """."""

        @click.pass_context
        def wrapper(ctx, *args, **kwargs):
            """."""

            # Dask Jobqueue SGE Cluster
            with SGECluster(name=self.name,
                            cores=self.cores,
                            # processes=self.processes,
                            memory=self.memory,
                            # interface=self.interface,
                            death_timeout=self.death_timeout,
                            queue=self.queue,
                            project=self.project,
                            walltime=self.walltime,
                            resource_spec=self.resource_spec,
                            kwargs=self.kwargs,
                            env_extra=self.env_extra) \
                    as cluster:

                # Adaptative number of workers
                if self.adapt:
                    cluster.adapt(minimum=self.min_workers,
                                  maximum=self.max_workers)

                else:
                    cluster.scale(self.max_workers)

                # Client
                with Client(cluster, processes=False) as client:

                    # wait for jobs to arrive, depending on the
                    # queue, this may take some time

                    ctx.obj['client'] = client
                    ctx.invoke(func, *args, **kwargs)

        return update_wrapper(wrapper, func)

Then you just have to decorate the function you want to distribute on the cluster, like this:

@on_SGECluster()
def test(ctx):
    """Test function.

    :param ctx: `click` context. Trick to retrieve variables
                defined within the decorator.

    """

    client = ctx.obj['client']  # retrieve the `Dask.distributed.Client` from within the decorator

   data = ...

test() # This will be distributed on the defined cluster!

Conclusion & perspective

As I said, I only started to use Dask and Xarray a month ago. There are still a lot functionnalities to explore and more room for optimization and robustification. Although I probably will switch from Pandas to Xarray very soon.

I will try to update this post regularily according to the progress I will make on the understanding of the framework.

Note

For some applications the CC-IN2P3 might not be the best place to be due for example to some limitations like the amount of memory per node.

I have not had time yet to explore the possibilities of Dask concerning the parallelisation on GPUs as well as its usage in machine learning. Although I hope I will have some time soon to do it, you will find in the sub section bellow some usefull links concerning these topics.

Dask on GPUs

Very recent development associating Dask with GPUs CuPy:

https://matthewrocklin.com/blog//work/2019/01/03/dask-array-gpus-first-steps

Dask and Machine learning

Dask is widely used together with scikit learn in the machine learning community:

https://ml.dask.org

A few words from Matthew Rocklin on Dask used with TensorFlow:

http://matthewrocklin.com/blog/work/2017/02/11/dask-tensorflow

Tip and tricks

Mastering dask can be tricky performance wise, this page is full of interesting tips and advices that I recommend to read.

All in all, I think it is helpful to remind that if your data fits in memory, do not use Dask since scheduling workers ultimately takes time.

Contact

If you have any question, feel free to call, email or visit me.

Dr. Clément BUTON
group : Cosomology/Euclid (bureau 307)
phone : +33 (0) 4 72 43 10 74
email : c.buton@ipnl.in2p3.fr