# Futures in IPython Parallel¶

New in IPython Parallel 5.0 is the fact that our AsyncResult object is now a Future (specifically a subclass of concurrent.futures.Future).

This means it can be integrated into any Future-using application.

[1]:

import ipyparallel as ipp
rc = ipp.Client()
dv = rc[:]
dv

[1]:

<DirectView [0, 1, 2, 3]>


Do some imports everywhere

[2]:

%%px --local
import os
import time
import numpy
from numpy.linalg import norm

[3]:

def random_norm(n):
"""Generates a 1xN array and computes its 2-norm"""
A = numpy.random.random(n)
return norm(A, 2)



The basic async API hasn’t changed:

[4]:

f = rc[-1].apply(random_norm, 100)
f

[4]:

<AsyncResult: random_norm>

[5]:

f.get()

[5]:

5.9875490723743265


But the familiar AsyncResult object is now a Future:

[6]:

f.__class__.mro()

[6]:

[ipyparallel.client.asyncresult.AsyncResult,
concurrent.futures._base.Future,
object]


This means that we can use Future APIs to access results, etc.

[7]:

f.result()

[7]:

5.9875490723743265

[8]:

import os
f = rc[-1].apply(os.getpid)
f.add_done_callback(lambda _: print("I got PID: %i" % _.result()))
f.get()

I got PID: 7892

[8]:

7892


A more complex example shows us how AsyncResults can be integrated into existing async applications, now that they are Futures:

[9]:

from tornado.gen import coroutine, sleep
import sys

time.sleep(t)
return os.getpid()

@coroutine
def background():
"""A backgorund coroutine to demonstrate that we aren't blocking"""
while True:
yield sleep(1)
print('.', end=' ')
sys.stdout.flush() # not needed after ipykernel 4.3

@coroutine
def work():
"""Submit some work and print the results when complete"""
for t in [ 1, 2, 3, 4 ]:
result = yield ar # this waits
print(result)

loop = IOLoop()
loop.run_sync(work)

. [7885, 7886, 7891, 7892]
. . [7885, 7886, 7891, 7892]
. . . [7885, 7886, 7891, 7892]
. . . . [7885, 7886, 7891, 7892]


So if you have an existing async application using coroutined and/or Futures, you can now integrate IPython Parallel as a standard async component for submitting work and waiting for its results.

## Executors¶

Executors are a standard Python API provided by various job-submission tools. A standard API such as Executor is useful for different libraries to expose this common API for asynchronous execution, because it means different implementations can be easily swapped out for each other and compared, or the best one for a given context can be used without having to change the code.

With IPython Parallel, every View has an .executor property, to provide the Executor API for the given View. Just like Views, the assignment of work for an Executor depends on the View from which it was created.

You can get an Executor for any View by accessing View.executor:

[10]:

ex_all = rc[:].executor
ex_all.view.targets

[10]:

[0, 1, 2, 3]

[11]:

even_lbview = rc.load_balanced_view(targets=rc.ids[::2])
ex_even = even_lbview.executor
for pid in ex_even.map(lambda x: os.getpid(), range(10)):
print(pid)

7891
7885
7891
7891
7891
7891
7885
7885
7885
7885


Typically, though, one will want an Executor for a LoadBalancedView on all the engines. In which case, use the top-level Client.executor method:

[12]:

ex = rc.executor()
ex.view

[12]:

<LoadBalancedView None>


## Comparing Executors¶

Let’s make a few Executors:

The distributed executor assumes you have started a distributed cluster on the default local interface, e.g.

\$> dcluster 127.0.0.1 127.0.0.1 127.0.0.1 127.0.0.1

[20]:

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import distributed

class DistributedExecutor(distributed.Executor):
"""Wrap distributed.Executor to provide standard Executor.map API

distributed.Executor.map returns list of Futures,
not iterable of results, like everything else.

See blaze/distributed#91
"""
def map(self, *args, **kwargs):
list_of_futures = super().map(*args, **kwargs)
for f in list_of_futures:
yield f.result()

N = 4
ip_ex = rc.executor(targets=range(N))
dist_ex = DistributedExecutor('127.0.0.1:8787')
process_ex = ProcessPoolExecutor(N)


[21]:

executors = [process_ex, thread_ex, ip_ex, dist_ex]


We can submit the same work with the same API, using four different mechanisms for distributing work. The results will be the same:

[22]:

for executor in executors:
print(executor.__class__.__name__)
it = executor.map(str, range(5))
print(list(it))


ProcessPoolExecutor
['0', '1', '2', '3', '4']
['0', '1', '2', '3', '4']
ViewExecutor
['0', '1', '2', '3', '4']
DistributedExecutor
['0', '1', '2', '3', '4']


This makes it easy to compare the different implementations. We are going to submit some dummy work—allocate and compute 2-norms of arrays of various sizes.

[23]:

def task(n):
"""Generates a 1xN array and computes its 2-norm"""
import numpy
from numpy.linalg import norm
A = numpy.ones(n)
return norm(A, 2)

[24]:

sizes = np.logspace(20, 24, 16, base=2, dtype=int)
sizes

[24]:

array([ 1048576,  1261463,  1517571,  1825676,  2196334,  2642245,
3178688,  3824041,  4600417,  5534417,  6658042,  8009791,
9635980, 11592325, 13945857, 16777216])


Run the work locally, to get a reference:

[25]:

print("Local time:")

Local time:
CPU times: user 161 ms, sys: 404 ms, total: 565 ms
Wall time: 560 ms


And then run again with the various Executors:

[26]:

for executor in executors:
print(executor.__class__.__name__)
rlist = list(result)
assert rlist == ref, "%s != %s" % (rlist, ref)

ProcessPoolExecutor
10 loops, best of 3: 126 ms per loop