Futures in IPython Parallel¶
New in IPython Parallel 5.0 is the fact that our AsyncResult object is now a Future (specifically a subclass of concurrent.futures.Future).
This means it can be integrated into any Future-using application.
import ipyparallel as ipp rc = ipp.Client() dv = rc[:] dv
<DirectView [0, 1, 2, 3]>
Do some imports everywhere
%%px --local import os import time import numpy from numpy.linalg import norm
def random_norm(n): """Generates a 1xN array and computes its 2-norm""" A = numpy.random.random(n) return norm(A, 2)
The basic async API hasn’t changed:
f = rc[-1].apply(random_norm, 100) f
But the familiar AsyncResult object is now a Future:
[ipyparallel.client.asyncresult.AsyncResult, concurrent.futures._base.Future, object]
This means that we can use Future APIs to access results, etc.
import os f = rc[-1].apply(os.getpid) f.add_done_callback(lambda _: print("I got PID: %i" % _.result())) f.get()
I got PID: 7892
A more complex example shows us how AsyncResults can be integrated into existing async applications, now that they are Futures:
from tornado.gen import coroutine, sleep from tornado.ioloop import IOLoop import sys def sleep_task(t): time.sleep(t) return os.getpid() @coroutine def background(): """A backgorund coroutine to demonstrate that we aren't blocking""" while True: yield sleep(1) print('.', end=' ') sys.stdout.flush() # not needed after ipykernel 4.3 @coroutine def work(): """Submit some work and print the results when complete""" for t in [ 1, 2, 3, 4 ]: ar = rc[:].apply(sleep_task, t) result = yield ar # this waits print(result) loop = IOLoop() loop.add_callback(background) loop.run_sync(work)
. [7885, 7886, 7891, 7892] . . [7885, 7886, 7891, 7892] . . . [7885, 7886, 7891, 7892] . . . . [7885, 7886, 7891, 7892]
So if you have an existing async application using coroutined and/or Futures, you can now integrate IPython Parallel as a standard async component for submitting work and waiting for its results.
Executors are a standard Python API provided by various job-submission tools. A standard API such as Executor is useful for different libraries to expose this common API for asynchronous execution, because it means different implementations can be easily swapped out for each other and compared, or the best one for a given context can be used without having to change the code.
With IPython Parallel, every View has an
.executor property, to provide the Executor API for the given View. Just like Views, the assignment of work for an Executor depends on the View from which it was created.
You can get an Executor for any View by accessing
ex_all = rc[:].executor ex_all.view.targets
[0, 1, 2, 3]
even_lbview = rc.load_balanced_view(targets=rc.ids[::2]) ex_even = even_lbview.executor for pid in ex_even.map(lambda x: os.getpid(), range(10)): print(pid)
7891 7885 7891 7891 7891 7891 7885 7885 7885 7885
Typically, though, one will want an Executor for a LoadBalancedView on all the engines. In which case, use the top-level Client.executor method:
ex = rc.executor() ex.view
Let’s make a few Executors:
The distributed executor assumes you have started a distributed cluster on the default local interface, e.g.
$> dcluster 127.0.0.1 127.0.0.1 127.0.0.1 127.0.0.1
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor import distributed class DistributedExecutor(distributed.Executor): """Wrap distributed.Executor to provide standard Executor.map API distributed.Executor.map returns list of Futures, not iterable of results, like everything else. See blaze/distributed#91 """ def map(self, *args, **kwargs): list_of_futures = super().map(*args, **kwargs) for f in list_of_futures: yield f.result() N = 4 ip_ex = rc.executor(targets=range(N)) dist_ex = DistributedExecutor('127.0.0.1:8787') thread_ex = ThreadPoolExecutor(N) process_ex = ProcessPoolExecutor(N)
executors = [process_ex, thread_ex, ip_ex, dist_ex]
We can submit the same work with the same API, using four different mechanisms for distributing work. The results will be the same:
for executor in executors: print(executor.__class__.__name__) it = executor.map(str, range(5)) print(list(it))
ProcessPoolExecutor ['0', '1', '2', '3', '4'] ThreadPoolExecutor ['0', '1', '2', '3', '4'] ViewExecutor ['0', '1', '2', '3', '4'] DistributedExecutor ['0', '1', '2', '3', '4']
This makes it easy to compare the different implementations. We are going to submit some dummy work—allocate and compute 2-norms of arrays of various sizes.
def task(n): """Generates a 1xN array and computes its 2-norm""" import numpy from numpy.linalg import norm A = numpy.ones(n) return norm(A, 2)
sizes = np.logspace(20, 24, 16, base=2, dtype=int) sizes
array([ 1048576, 1261463, 1517571, 1825676, 2196334, 2642245, 3178688, 3824041, 4600417, 5534417, 6658042, 8009791, 9635980, 11592325, 13945857, 16777216])
Run the work locally, to get a reference:
print("Local time:") %time ref = list(map(task, sizes))
Local time: CPU times: user 161 ms, sys: 404 ms, total: 565 ms Wall time: 560 ms
And then run again with the various Executors:
for executor in executors: print(executor.__class__.__name__) result = executor.map(task, sizes) rlist = list(result) assert rlist == ref, "%s != %s" % (rlist, ref) # time the task assignment %timeit list(executor.map(task, sizes))
ProcessPoolExecutor 10 loops, best of 3: 126 ms per loop ThreadPoolExecutor 10 loops, best of 3: 149 ms per loop ViewExecutor 10 loops, best of 3: 151 ms per loop DistributedExecutor 10 loops, best of 3: 141 ms per loop
For this toy work, the stdlib ProcessPoolExecutor appears to perform the best (though in testing, it seems to crash quite a bit). That’s useful info. One benefit of IPython Parallel or Distributed Executors over the stdlib Executors is that they do not have to be confined to a single machine. This means the standard Executor API lets you develop small-scale parallel tools that run locally in threads or processes, and then extend the exact same code to make use of multiple machines, just by selecting a different Executor.
That seems pretty useful.