Working with IPython and dask.distributed#
dask.distributed is a cool library for doing distributed execution. You should check it out, if you haven’t already.
In many cases, dask.distributed should replace using IPython Parallel if you primarily use the LoadBalancedView.
However, you may already have infrastructure for deploying and managing IPython engines, and IPython Parallel’s interactive debugging features can still be useful.
Any IPython cluster can become a dask cluster at any time, and be used simultaneously via both APIs.
You can turn your IPython cluster into a distributed cluster by calling Client.become_dask()
:
import ipyparallel as ipp
rc = ipp.Cluster(n=4).start_and_connect_sync()
Using existing profile dir: '/Users/minrk/.ipython/profile_default'
Starting 4 engines with <class 'ipyparallel.cluster.launcher.LocalEngineSetLauncher'>
dask_client = rc.become_dask(ncores=1)
dask_client
Client
Client-4de93880-0b0d-11ec-b993-784f4385c030
Connection method: Direct | |
Dashboard: http://192.168.1.31:53263/status |
Scheduler Info
Scheduler
Scheduler-faed09d3-e79f-4698-9162-b38cece51ddf
Comm: tcp://192.168.1.31:53264 | Workers: 2 |
Dashboard: http://192.168.1.31:53263/status | Total threads: 2 |
Started: Just now | Total memory: 8.00 GiB |
Workers
Worker: tcp://192.168.1.31:53626
Comm: tcp://192.168.1.31:53626 | Total threads: 1 |
Dashboard: http://192.168.1.31:53634/status | Memory: 4.00 GiB |
Nanny: None | |
Local directory: /Users/minrk/dev/ip/parallel/docs/source/examples/dask-worker-space/worker-4ra4jer1 | |
Tasks executing: 0 | Tasks in memory: 0 |
Tasks ready: 0 | Tasks in flight: 0 |
CPU usage: 0.0% | Last seen: Just now |
Memory usage: 96.50 MiB | Spilled bytes: 0 B |
Read bytes: 0.0 B | Write bytes: 0.0 B |
Worker: tcp://192.168.1.31:53627
Comm: tcp://192.168.1.31:53627 | Total threads: 1 |
Dashboard: http://192.168.1.31:53635/status | Memory: 4.00 GiB |
Nanny: None | |
Local directory: /Users/minrk/dev/ip/parallel/docs/source/examples/dask-worker-space/worker-03ml7c0q | |
Tasks executing: 0 | Tasks in memory: 0 |
Tasks ready: 0 | Tasks in flight: 0 |
CPU usage: 0.0% | Last seen: Just now |
Memory usage: 96.43 MiB | Spilled bytes: 0 B |
Read bytes: 234.10 kiB | Write bytes: 234.10 kiB |
This will:
start a Scheduler on the Hub
start a Worker on each engine
return an Executor, the distributed client API
By default, distributed Workers will use threads to run on all cores of a machine.
In this case, since I already have one engine per core,
I tell distributed to run one core per Worker with ncores=1
.
We can now use our IPython cluster with distributed:
from distributed import progress
def square(x):
return x**2
def neg(x):
return -x
A = dask_client.map(square, range(1000))
B = dask_client.map(neg, A)
total = dask_client.submit(sum, B)
progress(total)
total.result()
-332833500
I could also let distributed do its multithreading thing, and run one multi-threaded Worker per engine.
First, I need to get a mapping of one engine per host:
import socket
engine_hosts = rc[:].apply_async(socket.gethostname).get_dict()
engine_hosts
{0: 'touchy', 1: 'touchy', 2: 'touchy', 3: 'touchy'}
I can reverse this mapping, to get a list of engines on each host:
host_engines = {}
for engine_id, host in engine_hosts.items():
if host not in host_engines:
host_engines[host] = []
host_engines[host].append(engine_id)
host_engines
{'touchy': [0, 1, 2, 3]}
Now I can get one engine per host:
one_engine_per_host = [engines[0] for engines in host_engines.values()]
one_engine_per_host
[0]
Here’s a concise, but more opaque version that does the same thing:
one_engine_per_host = list({host: eid for eid, host in engine_hosts.items()}.values())
one_engine_per_host
[3]
I can now stop the first distributed cluster, and start a new one on just these engines, letting distributed allocate threads:
rc.stop_distributed()
dask_client = rc.become_dask(one_engine_per_host)
dask_client
Client
Client-4f4545ca-0b0d-11ec-b993-784f4385c030
Connection method: Direct | |
Dashboard: http://192.168.1.31:54310/status |
Scheduler Info
Scheduler
Scheduler-ecb384f5-10bb-4d53-9e4c-67db7ff07d03
Comm: tcp://192.168.1.31:54313 | Workers: 1 |
Dashboard: http://192.168.1.31:54310/status | Total threads: 1 |
Started: Just now | Total memory: 4.00 GiB |
Workers
Worker: tcp://192.168.1.31:54344
Comm: tcp://192.168.1.31:54344 | Total threads: 1 |
Dashboard: http://192.168.1.31:54345/status | Memory: 4.00 GiB |
Nanny: None | |
Local directory: /Users/minrk/dev/ip/parallel/docs/source/examples/dask-worker-space/worker-kmqmwu7_ | |
Tasks executing: 0 | Tasks in memory: 0 |
Tasks ready: 0 | Tasks in flight: 0 |
CPU usage: 0.0% | Last seen: Just now |
Memory usage: 104.27 MiB | Spilled bytes: 0 B |
Read bytes: 1.46 MiB | Write bytes: 1.46 MiB |
And submit the same tasks again:
A = dask_client.map(square, range(100))
B = dask_client.map(neg, A)
total = dask_client.submit(sum, B)
progress(total)
Debugging distributed with IPython#
rc.stop_distributed()
dask_client = rc.become_dask(one_engine_per_host)
dask_client
Client
Client-4f8be14c-0b0d-11ec-b993-784f4385c030
Connection method: Direct | |
Dashboard: http://192.168.1.31:54467/status |
Scheduler Info
Scheduler
Scheduler-9de91656-970d-405d-85ce-7e065b96c0c8
Comm: tcp://192.168.1.31:54478 | Workers: 0 |
Dashboard: http://192.168.1.31:54467/status | Total threads: 0 |
Started: Just now | Total memory: 0 B |
Workers
Let’s set the %px magics to only run on our one engine per host:
view = rc[one_engine_per_host]
view.block = True
view.activate()
Let’s submit some work that’s going to fail somewhere in the middle:
from distributed import progress
from IPython.display import display
def shift5(x):
return x - 5
def inverse(x):
return 1 / x
shifted = dask_client.map(shift5, range(1, 10))
inverted = dask_client.map(inverse, shifted)
total = dask_client.submit(sum, inverted)
display(progress(total))
total.result()
---------------------------------------------------------------------------
ZeroDivisionError Traceback (most recent call last)
/var/folders/qr/3vxfnp1x2t1fw55dr288mphc0000gn/T/ipykernel_14739/3426399645.py in <module>
16 total = dask_client.submit(sum, inverted)
17 display(progress(total))
---> 18 total.result()
~/conda/lib/python3.9/site-packages/distributed/client.py in result(self, timeout)
231 if self.status == "error":
232 typ, exc, tb = result
--> 233 raise exc.with_traceback(tb)
234 elif self.status == "cancelled":
235 raise result
/var/folders/qr/3vxfnp1x2t1fw55dr288mphc0000gn/T/ipykernel_14739/3426399645.py in inverse()
8
9 def inverse(x):
---> 10 return 1 / x
11
12
ZeroDivisionError: division by zero
We can see which task failed:
[f for f in inverted if f.status == "error"]
[<Future: error, key: inverse-4f03d874c328aca27052e4758dcf2423>]
When IPython starts a worker on each engine,
it stores it in the distributed_worker
variable in the engine’s namespace.
This lets us query the worker interactively.
We can check out the current data resident on each worker:
%%px
dask_worker.data # noqa
Out[3:1]: Buffer<<LRU: 440/2576980377 on dict>, <Func: serialize_bytelist<->deserialize_bytes <File: /Users/minrk/dev/ip/parallel/docs/source/examples/dask-worker-space/worker-e3zdjtzn/storage, mode="a", 0 elements>>>
Now that we can poke around with each Worker, we can have a slightly easier time figuring out what went wrong.