How to Distribute Machine Learning Workloads with Dask

Tell us if this sounds familiar. You’ve found an awesome data set that you think will allow you to train a machine learning (ML) model that will accomplish the project goals; the only problem is the data is too big to fit in the compute environment that you’re using. In the day and age of “big data,” most might think this issue is trivial, but like anything in the world of data science things are hardly ever as straightforward as they seem. 

You do have a few options though. You could reach out to your environment admin and request additional resources. Depending on the size and maturity of your company, this could be as easy as sending a brief message over Slack and getting what you need right away. But it could also be as hard as submitting a ticket that requires the approval of your boss’s boss (hopefully you have something else to do for the next couple of weeks). Both of these options could also just as likely result in a “no” purely due to cost constraints, because the size of your data could easily require 256 GB of RAM and no one in charge of the budget is going to want to foot the bill for that beast.

So what do you do? The easiest route (and probably most common) is to down sample and work within the approved constraints of your environment. But this has some well-known downsides, namely THROWING AWAY VALUABLE DATA. You could use Spark, but you’ve heard that the API syntax is tricky to utilize and that the ML libraries don’t work as well as advertised. In this blog, we’ll show you a better way to distribute your ML workloads, and how Cloudera Machine Learning makes it easy for you.

It’s important to note that the scenario described above is an example of being memory-constrained, but distributing your ML workloads can also be the solution when your workload is compute-constrained, meaning that the either the model size or training complexity is such that execution would take too long to complete in a reasonable time.

Prerequisites

If you want to test out this example for yourself, you will need to have access to either Cloudera Machine Learning (CML) or Cloudera Data Science Workbench (CDSW). If you are not already a Cloudera customer, you can sign up for a test drive today and experience what a first-class hybrid data platform is like.

Dask

In modern computing, there are several options available to developers for distributing their workloads. Dask is a library for parallel processing in Python, with a specific focus on analytic and scientific computing. Compared to Spark, it is a more familiar option to Python-oriented data scientists for parallel computation.

Dask enables data scientists to scale the Python libraries that they are already familiar with, like NumPy, pandas, scikit-learn, and XGBoost. Each of these common Python libraries has a Dask equivalent. For example, if you wanted to use the Dask equivalent of pandas, you would use import dask.dataframe as dd instead of the old reliable: import pandas as pd. There are some slight differences between the two libraries due to the parallel nature of dask, but for the vast majority of situations you can simply use dd the same way you would use pd.

To begin we need to install the following dependencies:



import os

import time

import dask

import dask.array as da

import dask.dataframe as dd

import dask_ml as dm

import dask_ml.datasets

import dask_ml.linear_model

from dask.distributed import Client

Launching workers in Cloudera Machine Learning

Cloudera Machine Learning (CML) provides basic support for launching multiple engine instances, known as workers, from a single session. This capability, combined with Dask, forms the foundation for easily distributing data science workloads in CML. To access the ability to launch additional workers, simply import the cdsw library.



import cdsw

Set up a Dask cluster

Dask achieves distributing workloads via a cluster of machines. This cluster consists of three different components: a centralized scheduler, one or more workers, and one or more clients, which act as the user-facing entry point for submitting tasks to the cluster. With CML, we can launch the components of the cluster as CDSW workers.

Start a Dask scheduler

We start a Dask scheduler as a CDSW worker process. We do this with cdsw.launch_workers, which spins up another session on our cluster and runs the command we provide—in this case the Dask scheduler. The scheduler is responsible for coordinating work between the Dask workers we will attach. Later we’ll start a Dask client in this notebook. The client talks to the scheduler, and the scheduler talks to the workers.



dask_scheduler = cdsw.launch_workers(

    n=1,

    cpu=1,

    memory=2,

    code=f"!dask-scheduler --host 0.0.0.0 --dashboard-address 127.0.0.1:8090",

)

# Wait for the scheduler to start.

time.sleep(10)

We need the IP address of the CML worker with the scheduler on it, so we can connect the Dask workers to it. The IP is not returned in the dask_scheduler object (it’s unknown at the launch of the scheduler), so we scan through the worker list and find the IP of the worker with the scheduler ID. This returns a list, but there should be only one entry.



scheduler_workers = cdsw.list_workers()

scheduler_id = dask_scheduler[0]["id"]

scheduler_ip = [

    worker["ip_address"] for worker in scheduler_workers if worker["id"] == scheduler_id

][0]




scheduler_url = f"tcp://{scheduler_ip}:8786"

Start the Dask workers

We’re now ready to expand our cluster with Dask workers. We start some more CML workers, each with one Dask worker process on it. We pass the scheduler URL we just found so that the scheduler can talk, and distribute work, to the workers.

N_WORKERS determines the number of CML workers started (and thus the number of Dask workers running in those sessions). Increasing the number will start more workers. This will speed up the wall-clock time but it uses more cluster resources. Exercise caution and good judgment.



N_WORKERS = 3

dask_workers = cdsw.launch_workers(

    n=N_WORKERS,

    cpu=1,

    memory=2,

    code=f"!dask-worker {scheduler_url}",

)

# Wait for the workers to start.

time.sleep(10)

Connect the Dask client

We now have a Dask cluster running and distributed over CML sessions. Next we can start a local Dask client and connect it to our scheduler. The Dask client sends instructions to the scheduler and collects results from the workers. This is the connection that lets us issue instructions to the Dask cluster as a whole.



client = Client(scheduler_url)

The Dask scheduler actually hosts a dashboard so we can monitor the work it’s doing. To access it we will construct the URL of the dashboard, which is hosted on the scheduler worker. Clicking the output of this print statement will open the dashboard in a new browser window.



print("//".join(dask_scheduler[0]["app_url"].split("//")) + "status")

To recap, we’ve set up the Dask scheduler, workers, and client, which means that we now have everything we need to distribute our machine learning workloads. Let’s try doing some data science!

Do some data sciencey stuff!

As mentioned before, Dask provides distributed equivalents to several popular and useful libraries in the Python data science ecosystem. Here we’ll give a very brief demo of the Dask equivalents of NumPy (Dask Array) and pandas (Dask DataFrames).

Dask Array



# create a random multidimensional array with Dask Array

array = da.random.random((10000, 10, 10000), chunks=1000)




# these manipulations do not carry any special meaning

array = (

    da.reshape(array, (10000, 100000)) # reshape the array

    .T                                   # transpose it

    [:10, :1000]                         # take only the first 10 elements of the outer axis

)




# create singular value decomposition of the transformed array

u, s, vh = da.linalg.svd(array)




# the arrays we just computed are distributed lazily, so call .compute() to access their contents

s.compute()

Dask DataFrames

Dask DataFrames are extremely similar to pandas DataFrames. In fact, Dask is really just coordinating pandas objects under the hood. As such, we have access to most of the pandas API, with the caveat that operations will be faster or slower depending on their degree of parallelizability.



# dask provides a handy dataset for demo-ing itself

df = dask.datasets.timeseries()

print(df.head())




# we can run standard pandas operations, like finding the unique values of a column

names = df["name"].unique().values

print(names.compute())




# we can chain operations

# once compute is called, we’re left with a pandas df

df[(df.name == "Oliver")][["x", "y"]].cumsum().compute().plot()

Shut it down

Now that we’re done computing with our Dask cluster, we should shut down those workers to free up resources for others and to avoid additional costs (your IT administrators will thank you).



cdsw.stop_workers(*[worker["id"] for worker in dask_workers + dask_scheduler])

Conclusion

Everything that we went through today is actually available via one of our newest Applied Machine Learning Prototypes (AMPs): Distributed XGBoost with Dask on CML. This AMP, like all AMPs, are 100% open sourced, so anyone can check out the complete prototype for themselves. Of course, for Cloudera customers, you get the added benefit of being able to deploy any AMP with a single click, which means you can get up and running with a Dask cluster for yourself and explore how XGBoost performs when distributed.

AMPs are fully built ML projects that can be deployed with one click directly from CML. AMPs enable data scientists to go from an idea to a fully working ML use case in a fraction of the time. They provide an end-to-end framework for building, deploying, and monitoring business-ready ML applications instantly.

Click here if you want to learn more about what is capable with Applied Machine Learning Prototypes from Cloudera.

The post How to Distribute Machine Learning Workloads with Dask appeared first on Cloudera Blog.

Leave a Comment

Your email address will not be published. Required fields are marked *