Just Start with the Dask LocalCluster

This article is the first article of an ongoing series on using Dask in practice. Each article in this series will be simple enough for beginners, but provide useful tips for real work.

There are many ways to run Dask clusters. This article urges users to start as simplistically as possible, and runs through easy ways of doing just that.

Go simple whenever possible

We host and manage multi-node Dask clusters. These clusters can even use GPUs. With Saturn users can access a cluster with hundreds of GPUs with a few clicks.

But wait.

Just because you have a Ferarri, doesn’t mean you should drive it to the grocery store.

Simpler is usually better.

Your order of scaling should be:

  1. Start with Pandas. That’s usually enough.
  2. Running out of memory? Use Pandas with a bigger machine. Saturn now has Jupyter instances with 4 terabytes of RAM
  3. Try a Dask Local Cluster
  4. Try a multi-node (and possibly multi-GPU) Dask Cluster

In general, the simpler your stack, the less time you’ll spend tinkering, and the more time you’ll spend being productive. I’m writing this article because I’ve spent time talking to users who use multi-node clusters when they don’t need to. It’s kind of our fault - we make them really easy to spin up.

Why LocalCluster?

Your computer has multiple cores (mine has 4). If you’re writing regular Python code, you’re probably only leveraging 1 of those cores. Some of your code (specifically code that calls NumPy, for things like matrix multiplication) is leveraging multiple cores because NumPy knows how to do so. Python doesn’t know how to execute the code you’re writing automatically on multiple cores.

Using a Dask cluster (along with a library that understands how to parallelize over dask) allows you to take advantage of all the cores on your system. Dask comes with some parallelized modules such as dask.dataframe and dask.array. Other libraries like Xarray and dask-ml` also integrate with Dask.

Dask can help you scale to enormous data sets with multi-node clusters. Dask can also help you take advantage of all the cores on your computer with LocalCluster

LocalCluster is easier than a multi-node Dask cluster

It’s just easier working with a single machine. On a single machine, htop can help you understand how your system is doing. On a single machine, you don’t have to worry about copying your code, or data files to your cluster of computers. On a single machine, you can see all the logs for your entire Dask cluster in the terminal.

What about multiprocessing?

Multiprocessing is great for embarrassingly parallel problems. For non-embarrassingly parallel problems, you’ll need a tool like Dask that knows how to parallelize complex operations over multiple cores. Since Dask can handle both types of parallel problems, you only have to learn one syntax. Plus, you get the fabulous Dask dashboard to monitor your work.

For non-embarassingly parallel problems, you’ll need a tool like Dask that knows how to paraellelize complex operations over multiple cores.

How do I use LocalCluster

There are many ways to use LocalCluster. I’ll show you the best one.

What not to do

Dask makes it really really easy to get started. Create a Dask client, and you’ve automatically got a LocalCluster

>>> from dask.distributed import Client
>>> c = Client()
>>> c.cluster
LocalCluster(16ff34f9, 'tcp://127.0.0.1:35257', workers=4, threads=8, memory=41.92 GB)
>>>

You can also use LocalCluster explicitly in Python

>>> from dask.distributed import Client
>>> from dask.distributed import LocalCluster
>>> cluster = LocalCluster()
>>> client = Client(cluster)

I use this approach when I deploy production data pipelines, but I don’t like doing this for research. If you have multiple Python sessions or notebooks, this approach makes it easy for you to accidentally end up with multiple clusters on your computer. It also makes it harder to view the logs, and find the address of the Dask dashboard.

What to do

Open a terminal.

$ dask-scheduler

Open a second terminal.

$ dask-worker tcp://127.0.0.1:8786

In your Python session (possibly a Jupyter notebook), execute the following:

from dask.distributed import Client
client = Client('tcp://127.0.0.1:8786')

There you go. The default address for the dask-scheduler is tcp://127.0.0.1:8786 and that second command was enough to set up Dask workers connected to the scheduler. The best part of this approach is that all the logs for all of your work will be in the second terminal. To shut it down, just hit ctrl-c in both terminals.

Processes and Threads

Python has this annoying thing called the global interpreter lock. You don’t need to understand too much about it, except that it means that Python can’t really leverage multiple threads very well. The general exceptions to this rule are code that is mostly doing I/O (downloading data), or code that is leveraging mostly C++ and other non-python libraries (NumPy, for example).

Most users that we’ve worked with are best served using processes over threads (you can mix them as well). This is how I do this on my machine.

  1. I have 40 GB of RAM (Thank you System76!!)
  2. I have 4 cores. So I want 4 workers.
  3. This means each worker can consume 10 GB of RAM.
$ dask-worker tcp://127.0.0.1:8786 --nprocs 4 --memory-limit 10GB

Conclusion

Start simple, and scale up gradually. At Saturn, we can give you 4 terabyte instances for all your local cluster needs. And if that’s not enough, you can then run the same code on a multi-node (and multi-GPU) cluster, with a few clicks. Happy Dask-ing!