Futures in IPython Parallel#

The IPython Parallel AsyncResult object extends concurrent.futures.Future, which makes it compatible with most async frameworks in Python.

import ipyparallel as ipp
rc = ipp.Cluster(n=4).start_and_connect_sync()

dv = rc[:]
dv.activate()
dv
Using existing profile dir: '/Users/minrk/.ipython/profile_default'
Starting 4 engines with <class 'ipyparallel.cluster.launcher.LocalEngineSetLauncher'>
<DirectView [0, 1, 2, 3]>

Do some imports everywhere

%%px --local --block
import os
import time
import numpy
from numpy.linalg import norm
def random_norm(n):
    """Generates a 1xN array and computes its 2-norm"""
    A = numpy.random.random(n)
    return norm(A, 2)

The basic async API hasn’t changed:

f = rc[-1].apply(random_norm, 100)
f
<AsyncResult: random_norm>
f.get()
5.854015134508366

But the full Futures API is now available:

f.result()
5.854015134508366

The standard futures API has methods for registering callbackes, etc.

import os
f = rc[-1].apply(os.getpid)
f.add_done_callback(lambda _: print("I got PID: %i" % _.result()))
f.result()
I got PID: 12509
12509

A more complex example shows us how AsyncResults can be integrated into existing async applications, now that they are Futures:

import asyncio
from tornado.ioloop import IOLoop
import sys

def sleep_task(t):
    time.sleep(t)
    return os.getpid()

async def background():
    """A backgorund coroutine to demonstrate that we aren't blocking"""
    while True:
        await asyncio.sleep(1)
        print('.', end=' ')
        sys.stdout.flush() # not needed after ipykernel 4.3

async def work():
    """Submit some work and print the results when complete"""
    for t in [ 1, 2, 3, 4 ]:
        ar =  rc[:].apply(sleep_task, t)
        result = await asyncio.wrap_future(ar) # this waits
        print(result)

bg = asyncio.Task(background())
await work()
bg.cancel();
. [12507, 12506, 12508, 12509]
. . [12507, 12506, 12508, 12509]
. . . [12507, 12506, 12508, 12509]
. . . . [12507, 12506, 12508, 12509]

So if you have an existing async application using coroutines and/or Futures, you can now integrate IPython Parallel as a standard async component for submitting work and waiting for its results.

Executors#

Executors are a standard Python API provided by various job-submission tools. A standard API such as Executor is useful for different libraries to expose this common API for asynchronous execution, because it means different implementations can be easily swapped out for each other and compared, or the best one for a given context can be used without having to change the code.

With IPython Parallel, every View has an .executor property, to provide the Executor API for the given View. Just like Views, the assignment of work for an Executor depends on the View from which it was created.

You can get an Executor for any View by accessing View.executor:

ex_all = rc[:].executor
ex_all.view.targets
[0, 1, 2, 3]
even_lbview = rc.load_balanced_view(targets=rc.ids[::2])
ex_even = even_lbview.executor
for pid in ex_even.map(lambda x: os.getpid(), range(10)):
    print(pid)
12507
12508
12507
12508
12507
12508
12507
12508
12507
12508

Typically, though, one will want an Executor for a LoadBalancedView on all the engines. This is what the top-level Client.executor() method will return:

ex = rc.executor()
ex.view
<LoadBalancedView None>

Let’s create a few compatible Executor instances

Comparing Executors#

Let’s make a few Executors. Aside: dask.distributed is a great library. Any IPython Parallel cluster can be bootstrapped into a dask cluster.

There can be serialization differences, especially for interactively defined functions (i.e. those in defined in a notebook itself). That’s why we define our task function in a local module, rather than here. ProcessPoolExecutor doesn’t serialize interactively defined functions. But for the most part working with functions defined in modules works consistently across implementations.

%pycat task_mod.py
from task_mod import task
import numpy
from numpy.linalg import norm

def task(n):
    """Generates a 1xN array and computes its 2-norm"""
    A = numpy.ones(n)
    return norm(A, 2)
def task(n):
    """Generates a 1xN array and computes its 2-norm"""
    import numpy
    from numpy.linalg import norm
    A = numpy.ones(n)
    return norm(A, 2)
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import distributed

distributed_client = rc.become_dask()
dist_ex = distributed_client.get_executor()

N = 4
ip_ex = rc.executor(targets=range(N))
thread_ex = ThreadPoolExecutor(N)
process_ex = ProcessPoolExecutor(N)
executors = [process_ex, thread_ex, ip_ex, dist_ex]

We can submit the same work with the same API, using four different mechanisms for distributing work. The results will be the same:

for executor in executors:
    print(executor.__class__.__name__)
    it = executor.map(str, range(5))
    print(list(it))

This makes it easy to compare the different implementations. We are going to submit some dummy work—allocate and compute 2-norms of arrays of various sizes.

sizes = np.logspace(20, 24, 16, base=2, dtype=int)
sizes
array([ 1048576,  1261463,  1517571,  1825676,  2196334,  2642245,
        3178688,  3824041,  4600417,  5534417,  6658042,  8009791,
        9635980, 11592325, 13945857, 16777216])

Run the work locally, to get a reference:

print("Local time:")
%time ref = list(map(task, sizes))
Local time:
CPU times: user 765 ms, sys: 403 ms, total: 1.17 s
Wall time: 874 ms

And then run again with the various Executors:

for executor in executors:
    print(executor.__class__.__name__)
    result = executor.map(task, sizes)
    rlist = list(result)
    assert rlist == ref, "%s != %s" % (rlist, ref)
    # time the task assignment
    %timeit list(executor.map(task, sizes))
ProcessPoolExecutor
246 ms ± 86 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
ThreadPoolExecutor
182 ms ± 32.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
ViewExecutor
228 ms ± 24.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
ClientExecutor
246 ms ± 27.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

For this toy work, the stdlib ThreadPoolExecutor appears to perform the best. That’s useful info, and likely to be true for most workloads that release the GIL and fit comfortably into memory. When the GIL is involved, ProcessPoolExecutor is often best for simple workloads.

One benefit of IPython Parallel or Distributed Executors over the stdlib Executors is that they do not have to be confined to a single machine. This means the standard Executor API lets you develop small-scale parallel tools that run locally in threads or processes, and then extend the exact same code to make use of multiple machines, just by selecting a different Executor.

That seems pretty useful. joblib is another package to implement standardized APIs for parallel backends, which IPython Parallel also supports.