Futures in IPython Parallel

New in IPython Parallel 5.0 is the fact that our AsyncResult object is now a Future (specifically a subclass of concurrent.futures.Future).

This means it can be integrated into any Future-using application.

[1]:
import ipyparallel as ipp
rc = ipp.Client()
dv = rc[:]
dv
[1]:
<DirectView [0, 1, 2, 3]>

Do some imports everywhere

[2]:
%%px --local
import os
import time
import numpy
from numpy.linalg import norm
[3]:
def random_norm(n):
    """Generates a 1xN array and computes its 2-norm"""
    A = numpy.random.random(n)
    return norm(A, 2)

The basic async API hasn’t changed:

[4]:
f = rc[-1].apply(random_norm, 100)
f
[4]:
<AsyncResult: random_norm>
[5]:
f.get()
[5]:
5.9875490723743265

But the familiar AsyncResult object is now a Future:

[6]:
f.__class__.mro()
[6]:
[ipyparallel.client.asyncresult.AsyncResult,
 concurrent.futures._base.Future,
 object]

This means that we can use Future APIs to access results, etc.

[7]:
f.result()
[7]:
5.9875490723743265
[8]:
import os
f = rc[-1].apply(os.getpid)
f.add_done_callback(lambda _: print("I got PID: %i" % _.result()))
f.get()
I got PID: 7892
[8]:
7892

A more complex example shows us how AsyncResults can be integrated into existing async applications, now that they are Futures:

[9]:
from tornado.gen import coroutine, sleep
from tornado.ioloop import IOLoop
import sys

def sleep_task(t):
    time.sleep(t)
    return os.getpid()

@coroutine
def background():
    """A backgorund coroutine to demonstrate that we aren't blocking"""
    while True:
        yield sleep(1)
        print('.', end=' ')
        sys.stdout.flush() # not needed after ipykernel 4.3

@coroutine
def work():
    """Submit some work and print the results when complete"""
    for t in [ 1, 2, 3, 4 ]:
        ar =  rc[:].apply(sleep_task, t)
        result = yield ar # this waits
        print(result)

loop = IOLoop()
loop.add_callback(background)
loop.run_sync(work)
. [7885, 7886, 7891, 7892]
. . [7885, 7886, 7891, 7892]
. . . [7885, 7886, 7891, 7892]
. . . . [7885, 7886, 7891, 7892]

So if you have an existing async application using coroutined and/or Futures, you can now integrate IPython Parallel as a standard async component for submitting work and waiting for its results.

Executors

Executors are a standard Python API provided by various job-submission tools. A standard API such as Executor is useful for different libraries to expose this common API for asynchronous execution, because it means different implementations can be easily swapped out for each other and compared, or the best one for a given context can be used without having to change the code.

With IPython Parallel, every View has an .executor property, to provide the Executor API for the given View. Just like Views, the assignment of work for an Executor depends on the View from which it was created.

You can get an Executor for any View by accessing View.executor:

[10]:
ex_all = rc[:].executor
ex_all.view.targets
[10]:
[0, 1, 2, 3]
[11]:
even_lbview = rc.load_balanced_view(targets=rc.ids[::2])
ex_even = even_lbview.executor
for pid in ex_even.map(lambda x: os.getpid(), range(10)):
    print(pid)
7891
7885
7891
7891
7891
7891
7885
7885
7885
7885

Typically, though, one will want an Executor for a LoadBalancedView on all the engines. In which case, use the top-level Client.executor method:

[12]:
ex = rc.executor()
ex.view
[12]:
<LoadBalancedView None>

Comparing Executors

Let’s make a few Executors:

The distributed executor assumes you have started a distributed cluster on the default local interface, e.g.

$> dcluster 127.0.0.1 127.0.0.1 127.0.0.1 127.0.0.1
[20]:
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import distributed

class DistributedExecutor(distributed.Executor):
    """Wrap distributed.Executor to provide standard Executor.map API

    distributed.Executor.map returns list of Futures,
    not iterable of results, like everything else.

    See blaze/distributed#91
    """
    def map(self, *args, **kwargs):
        list_of_futures = super().map(*args, **kwargs)
        for f in list_of_futures:
            yield f.result()

N = 4
ip_ex = rc.executor(targets=range(N))
dist_ex = DistributedExecutor('127.0.0.1:8787')
thread_ex = ThreadPoolExecutor(N)
process_ex = ProcessPoolExecutor(N)

[21]:
executors = [process_ex, thread_ex, ip_ex, dist_ex]

We can submit the same work with the same API, using four different mechanisms for distributing work. The results will be the same:

[22]:
for executor in executors:
    print(executor.__class__.__name__)
    it = executor.map(str, range(5))
    print(list(it))

ProcessPoolExecutor
['0', '1', '2', '3', '4']
ThreadPoolExecutor
['0', '1', '2', '3', '4']
ViewExecutor
['0', '1', '2', '3', '4']
DistributedExecutor
['0', '1', '2', '3', '4']

This makes it easy to compare the different implementations. We are going to submit some dummy work—allocate and compute 2-norms of arrays of various sizes.

[23]:
def task(n):
    """Generates a 1xN array and computes its 2-norm"""
    import numpy
    from numpy.linalg import norm
    A = numpy.ones(n)
    return norm(A, 2)
[24]:
sizes = np.logspace(20, 24, 16, base=2, dtype=int)
sizes
[24]:
array([ 1048576,  1261463,  1517571,  1825676,  2196334,  2642245,
        3178688,  3824041,  4600417,  5534417,  6658042,  8009791,
        9635980, 11592325, 13945857, 16777216])

Run the work locally, to get a reference:

[25]:
print("Local time:")
%time ref = list(map(task, sizes))
Local time:
CPU times: user 161 ms, sys: 404 ms, total: 565 ms
Wall time: 560 ms

And then run again with the various Executors:

[26]:
for executor in executors:
    print(executor.__class__.__name__)
    result = executor.map(task, sizes)
    rlist = list(result)
    assert rlist == ref, "%s != %s" % (rlist, ref)
    # time the task assignment
    %timeit list(executor.map(task, sizes))
ProcessPoolExecutor
10 loops, best of 3: 126 ms per loop
ThreadPoolExecutor
10 loops, best of 3: 149 ms per loop
ViewExecutor
10 loops, best of 3: 151 ms per loop
DistributedExecutor
10 loops, best of 3: 141 ms per loop

For this toy work, the stdlib ProcessPoolExecutor appears to perform the best (though in testing, it seems to crash quite a bit). That’s useful info. One benefit of IPython Parallel or Distributed Executors over the stdlib Executors is that they do not have to be confined to a single machine. This means the standard Executor API lets you develop small-scale parallel tools that run locally in threads or processes, and then extend the exact same code to make use of multiple machines, just by selecting a different Executor.

That seems pretty useful.