Working with IPython and dask.distributed

dask.distributed is a cool library for doing distributed execution. You should check it out, if you haven’t already.

In many cases, dask.distributed should replace using IPython Parallel if you primarily use the LoadBalancedView.

However, you may already have infrastructure for deploying and managing IPython engines, and IPython Parallel’s interactive debugging features can still be useful.

Any IPython cluster can become a dask clsuter at any time, and be used simultaneously via both APIs.

You can turn your IPython cluster into a distributed cluster by calling Client.become_dask():

[1]:
import ipyparallel as ipp

cluster = ipp.Cluster()
n = 4
await cluster.start_cluster(n)
rc = await cluster.connect_client()
rc.wait_for_engines(n)
Using existing profile dir: '/Users/minrk/.ipython/profile_default'
Starting 4 engines with <class 'ipyparallel.cluster.launcher.LocalEngineSetLauncher'>
[2]:
dask_client = rc.become_dask(ncores=1)
dask_client
[2]:

Client

Client-4de93880-0b0d-11ec-b993-784f4385c030

Connection method: Direct
Dashboard: http://192.168.1.31:53263/status

Scheduler Info

Scheduler

Scheduler-faed09d3-e79f-4698-9162-b38cece51ddf

Comm: tcp://192.168.1.31:53264 Workers: 2
Dashboard: http://192.168.1.31:53263/status Total threads: 2
Started: Just now Total memory: 8.00 GiB

Workers

Worker: tcp://192.168.1.31:53626

Comm: tcp://192.168.1.31:53626 Total threads: 1
Dashboard: http://192.168.1.31:53634/status Memory: 4.00 GiB
Nanny: None
Local directory: /Users/minrk/dev/ip/parallel/docs/source/examples/dask-worker-space/worker-4ra4jer1
Tasks executing: 0 Tasks in memory: 0
Tasks ready: 0 Tasks in flight: 0
CPU usage: 0.0% Last seen: Just now
Memory usage: 96.50 MiB Spilled bytes: 0 B
Read bytes: 0.0 B Write bytes: 0.0 B

Worker: tcp://192.168.1.31:53627

Comm: tcp://192.168.1.31:53627 Total threads: 1
Dashboard: http://192.168.1.31:53635/status Memory: 4.00 GiB
Nanny: None
Local directory: /Users/minrk/dev/ip/parallel/docs/source/examples/dask-worker-space/worker-03ml7c0q
Tasks executing: 0 Tasks in memory: 0
Tasks ready: 0 Tasks in flight: 0
CPU usage: 0.0% Last seen: Just now
Memory usage: 96.43 MiB Spilled bytes: 0 B
Read bytes: 234.10 kiB Write bytes: 234.10 kiB

This will:

  1. start a Scheduler on the Hub

  2. start a Worker on each engine

  3. return an Executor, the distributed client API

By default, distributed Workers will use threads to run on all cores of a machine. In this case, since I already have one engine per core, I tell distributed to run one core per Worker with ncores=1.

We can now use our IPython cluster with distributed:

[3]:
from distributed import progress


def square(x):
    return x ** 2


def neg(x):
    return -x


A = dask_client.map(square, range(1000))
B = dask_client.map(neg, A)
total = dask_client.submit(sum, B)
progress(total)
[4]:
total.result()
[4]:
-332833500

I could also let distributed do its multithreading thing, and run one multi-threaded Worker per engine.

First, I need to get a mapping of one engine per host:

[5]:
import socket

engine_hosts = rc[:].apply_async(socket.gethostname).get_dict()
engine_hosts
[5]:
{0: 'touchy', 1: 'touchy', 2: 'touchy', 3: 'touchy'}

I can reverse this mapping, to get a list of engines on each host:

[6]:
host_engines = {}
for engine_id, host in engine_hosts.items():
    if host not in host_engines:
        host_engines[host] = []
    host_engines[host].append(engine_id)

host_engines
[6]:
{'touchy': [0, 1, 2, 3]}

Now I can get one engine per host:

[7]:
one_engine_per_host = [engines[0] for engines in host_engines.values()]
one_engine_per_host
[7]:
[0]

Here’s a concise, but more opaque version that does the same thing:

[8]:
one_engine_per_host = list({host: eid for eid, host in engine_hosts.items()}.values())
one_engine_per_host
[8]:
[3]

I can now stop the first distributed cluster, and start a new one on just these engines, letting distributed allocate threads:

[9]:
rc.stop_distributed()

dask_client = rc.become_dask(one_engine_per_host)
dask_client
[9]:

Client

Client-4f4545ca-0b0d-11ec-b993-784f4385c030

Connection method: Direct
Dashboard: http://192.168.1.31:54310/status

Scheduler Info

Scheduler

Scheduler-ecb384f5-10bb-4d53-9e4c-67db7ff07d03

Comm: tcp://192.168.1.31:54313 Workers: 1
Dashboard: http://192.168.1.31:54310/status Total threads: 1
Started: Just now Total memory: 4.00 GiB

Workers

Worker: tcp://192.168.1.31:54344

Comm: tcp://192.168.1.31:54344 Total threads: 1
Dashboard: http://192.168.1.31:54345/status Memory: 4.00 GiB
Nanny: None
Local directory: /Users/minrk/dev/ip/parallel/docs/source/examples/dask-worker-space/worker-kmqmwu7_
Tasks executing: 0 Tasks in memory: 0
Tasks ready: 0 Tasks in flight: 0
CPU usage: 0.0% Last seen: Just now
Memory usage: 104.27 MiB Spilled bytes: 0 B
Read bytes: 1.46 MiB Write bytes: 1.46 MiB

And submit the same tasks again:

[10]:
A = dask_client.map(square, range(100))
B = dask_client.map(neg, A)
total = dask_client.submit(sum, B)
progress(total)

Debugging distributed with IPython

[11]:
rc.stop_distributed()

dask_client = rc.become_dask(one_engine_per_host)
dask_client
[11]:

Client

Client-4f8be14c-0b0d-11ec-b993-784f4385c030

Connection method: Direct
Dashboard: http://192.168.1.31:54467/status

Scheduler Info

Scheduler

Scheduler-9de91656-970d-405d-85ce-7e065b96c0c8

Comm: tcp://192.168.1.31:54478 Workers: 0
Dashboard: http://192.168.1.31:54467/status Total threads: 0
Started: Just now Total memory: 0 B

Workers

Let’s set the %px magics to only run on our one engine per host:

[12]:
view = rc[one_engine_per_host]
view.block = True
view.activate()

Let’s submit some work that’s going to fail somewhere in the middle:

[13]:
from IPython.display import display
from distributed import progress


def shift5(x):
    return x - 5


def inverse(x):
    return 1 / x


shifted = dask_client.map(shift5, range(1, 10))
inverted = dask_client.map(inverse, shifted)

total = dask_client.submit(sum, inverted)
display(progress(total))
total.result()
---------------------------------------------------------------------------
ZeroDivisionError                         Traceback (most recent call last)
/var/folders/qr/3vxfnp1x2t1fw55dr288mphc0000gn/T/ipykernel_14739/3426399645.py in <module>
     16 total = dask_client.submit(sum, inverted)
     17 display(progress(total))
---> 18 total.result()

~/conda/lib/python3.9/site-packages/distributed/client.py in result(self, timeout)
    231         if self.status == "error":
    232             typ, exc, tb = result
--> 233             raise exc.with_traceback(tb)
    234         elif self.status == "cancelled":
    235             raise result

/var/folders/qr/3vxfnp1x2t1fw55dr288mphc0000gn/T/ipykernel_14739/3426399645.py in inverse()
      8
      9 def inverse(x):
---> 10     return 1 / x
     11
     12

ZeroDivisionError: division by zero

We can see which task failed:

[14]:
[f for f in inverted if f.status == "error"]
[14]:
[<Future: error, key: inverse-4f03d874c328aca27052e4758dcf2423>]

When IPython starts a worker on each engine, it stores it in the distributed_worker variable in the engine’s namespace. This lets us query the worker interactively.

We can check out the current data resident on each worker:

[15]:
%%px
dask_worker.data
Out[3:1]: Buffer<<LRU: 440/2576980377 on dict>, <Func: serialize_bytelist<->deserialize_bytes <File: /Users/minrk/dev/ip/parallel/docs/source/examples/dask-worker-space/worker-e3zdjtzn/storage, mode="a", 0 elements>>>

Now that we can poke around with each Worker, we can have a slightly easier time figuring out what went wrong.