Working with IPython and dask.distributed

dask.distributed is a cool library for doing distributed execution. You should check it out, if you haven’t already.

Assuming you already have an IPython cluster running:

[1]:
import ipyparallel as ipp
rc = ipp.Client()
rc.ids
[1]:
[0, 1, 2, 3, 4, 5, 6, 7]

You can turn your IPython cluster into a distributed cluster by calling Client.become_dask():

[2]:
executor = rc.become_dask(ncores=1)
executor
[2]:
<Executor: scheduler="172.16.3.46:52245" processes=9 cores=9>

This will:

  1. start a Scheduler on the Hub

  2. start a Worker on each engine

  3. return an Executor, the distributed client API

By default, distributed Workers will use threads to run on all cores of a machine. In this case, since I already have one engine per core, I tell distributed to run one core per Worker with ncores=1.

We can now use our IPython cluster with distributed:

[3]:
from distributed import progress

def square(x):
    return x ** 2

def neg(x):
        return -x

A = executor.map(square, range(1000))
B = executor.map(neg, A)
total = executor.submit(sum, B)
progress(total)
[4]:
total.result()
[4]:
-332833500

I could also let distributed do its multithreading thing, and run one multi-threaded Worker per engine.

First, I need to get a mapping of one engine per host:

[5]:
import socket

engine_hosts = rc[:].apply_async(socket.gethostname).get_dict()
engine_hosts
[5]:
{0: 'k5.simula.no',
 1: 'k5.simula.no',
 2: 'k5.simula.no',
 3: 'k5.simula.no',
 4: 'k5.simula.no',
 5: 'k5.simula.no',
 6: 'k5.simula.no',
 7: 'k5.simula.no'}

I can reverse this mapping, to get a list of engines on each host:

[6]:
host_engines = {}
for engine_id, host in engine_hosts.items():
    if host not in host_engines:
        host_engines[host] = []
    host_engines[host].append(engine_id)

host_engines
[6]:
{'k5.simula.no': [0, 1, 2, 3, 4, 5, 6, 7]}

Now I can get one engine per host:

[7]:
one_engine_per_host = [ engines[0] for engines in host_engines.values()]
one_engine_per_host
[7]:
[0]

Here’s a concise, but more opaque version that does the same thing:

[8]:
one_engine_per_host = list({host:eid for eid,host in engine_hosts.items()}.values())
one_engine_per_host
[8]:
[7]

I can now stop the first distributed cluster, and start a new one on just these engines, letting distributed allocate threads:

[9]:
rc.stop_distributed()

executor = rc.become_dask(one_engine_per_host)
executor
distributed.executor - INFO - Reconnecting...
[9]:
<Executor: scheduler="172.16.3.46:59120" processes=1 cores=1>

And submit the same tasks again:

[10]:
A = executor.map(square, range(100))
B = executor.map(neg, A)
total = executor.submit(sum, B)
progress(total)
Widget Javascript not detected.  It may not be installed properly. Did you enable the widgetsnbextension? If not, then run "jupyter nbextension enable --py --sys-prefix widgetsnbextension"

Debugging distributed with IPython

[11]:
rc.stop_distributed()

executor = rc.become_dask(one_engine_per_host)
executor
distributed.executor - INFO - Reconnecting...
[11]:
<Executor: scheduler="172.16.3.46:59142" processes=1 cores=1>

Let’s set the %px magics to only run on our one engine per host:

[12]:
view = rc[one_engine_per_host]
view.block = True
view.activate()

Let’s submit some work that’s going to fail somewhere in the middle:

[13]:
from IPython.display import display
from distributed import progress

def shift5(x):
    return x - 5

def inverse(x):
    return 1 / x

shifted = executor.map(shift5, range(1, 10))
inverted = executor.map(inverse, shifted)

total = executor.submit(sum, inverted)
display(progress(total))
total.result()
Widget Javascript not detected.  It may not be installed properly. Did you enable the widgetsnbextension? If not, then run "jupyter nbextension enable --py --sys-prefix widgetsnbextension"
---------------------------------------------------------------------------
ZeroDivisionError                         Traceback (most recent call last)
<ipython-input-13-183a85878b6a> in <module>()
     13 total = executor.submit(sum, inverted)
     14 display(progress(total))
---> 15 total.result()

/Users/benjaminrk/dev/py/distributed/distributed/executor.py in result(self)
    100         result = sync(self.executor.loop, self._result, raiseit=False)
    101         if self.status == 'error':
--> 102             six.reraise(*result)
    103         if self.status == 'cancelled':
    104             raise result

/Users/benjaminrk/conda/lib/python3.5/site-packages/six.py in reraise(tp, value, tb)
    683             value = tp()
    684         if value.__traceback__ is not tb:
--> 685             raise value.with_traceback(tb)
    686         raise value
    687

<ipython-input-13-183a85878b6a> in inverse()
      6
      7 def inverse(x):
----> 8     return 1 / x
      9
     10 shifted = executor.map(shift5, range(1, 10))

ZeroDivisionError: division by zero

We can see which task failed:

[14]:
[ f for f in inverted if f.status == 'error' ]
[14]:
[<Future: status: error, key: inverse-f8907aa30adc310cc8168553500ca8bb>]

When IPython starts a worker on each engine, it stores it in the distributed_worker variable in the engine’s namespace. This lets us query the worker interactively.

We can check out the current data resident on each worker:

[15]:
%%px
dask_worker.data
Out[7:2]: 
{'inverse-07072811957c38188d819607f8020bed': 0.3333333333333333,
 'inverse-0994af96c984b7254e2437daa46df6c8': 1.0,
 'inverse-1934b1ad8662540a6b1a321502d3d81e': 0.25,
 'inverse-2e0af360f3e400c0360eaa3351e80a4d': -1.0,
 'inverse-8ef20ef722160668e84ab435b8293751': -0.5,
 'inverse-bee9906329afc3cb86cc241209453f56': -0.3333333333333333,
 'inverse-cfd3e5b72a33fd2fa85c683107287cf9': -0.25,
 'inverse-d9ed866e67ebc068f6561f9263c4cf73': 0.5,
 'shift5-17c829bc866d38df11bb25ffc7ea887f': -3,
 'shift5-3035396f215ce921eda38f8f36ca3e90': 4,
 'shift5-4951afd99368d41997f42a2f823f566f': 2,
 'shift5-5c9f9254c4a34e7571d53ee4839ea6f2': 1,
 'shift5-8458d8715078405cb9dfed60d1c3d26a': -2,
 'shift5-899e24c059f86698e06254cfd5f3f4ea': -1,
 'shift5-9326e9993993cb1c08355c6e5b8e5970': -4,
 'shift5-cabacfd5aaf525d183d932617b8eac5a': 0,
 'shift5-e233bf13876414d6a0a4817695ac7ca1': 3}

Now that we can poke around with each Worker, we can have a slightly easier time figuring out what went wrong.