Futures in IPython Parallel#
The IPython Parallel AsyncResult object extends
which makes it compatible with most async frameworks in Python.
import ipyparallel as ipp rc = ipp.Cluster(n=4).start_and_connect_sync() dv = rc[:] dv.activate() dv
Using existing profile dir: '/Users/minrk/.ipython/profile_default' Starting 4 engines with <class 'ipyparallel.cluster.launcher.LocalEngineSetLauncher'>
<DirectView [0, 1, 2, 3]>
Do some imports everywhere
%%px --local --block import os import time import numpy from numpy.linalg import norm
def random_norm(n): """Generates a 1xN array and computes its 2-norm""" A = numpy.random.random(n) return norm(A, 2)
The basic async API hasn’t changed:
f = rc[-1].apply(random_norm, 100) f
But the full Futures API is now available:
The standard futures API has methods for registering callbackes, etc.
import os f = rc[-1].apply(os.getpid) f.add_done_callback(lambda _: print("I got PID: %i" % _.result())) f.result()
I got PID: 12509
A more complex example shows us how AsyncResults can be integrated into existing async applications, now that they are Futures:
import asyncio from tornado.ioloop import IOLoop import sys def sleep_task(t): time.sleep(t) return os.getpid() async def background(): """A backgorund coroutine to demonstrate that we aren't blocking""" while True: await asyncio.sleep(1) print('.', end=' ') sys.stdout.flush() # not needed after ipykernel 4.3 async def work(): """Submit some work and print the results when complete""" for t in [ 1, 2, 3, 4 ]: ar = rc[:].apply(sleep_task, t) result = await asyncio.wrap_future(ar) # this waits print(result) bg = asyncio.Task(background()) await work() bg.cancel();
. [12507, 12506, 12508, 12509] . . [12507, 12506, 12508, 12509] . . . [12507, 12506, 12508, 12509] . . . . [12507, 12506, 12508, 12509]
So if you have an existing async application using coroutines and/or Futures, you can now integrate IPython Parallel as a standard async component for submitting work and waiting for its results.
Executors are a standard Python API provided by various job-submission tools. A standard API such as Executor is useful for different libraries to expose this common API for asynchronous execution, because it means different implementations can be easily swapped out for each other and compared, or the best one for a given context can be used without having to change the code.
With IPython Parallel, every View has an
.executor property, to provide the Executor API for the given View.
Just like Views, the assignment of work for an Executor depends on the View from which it was created.
You can get an Executor for any View by accessing
ex_all = rc[:].executor ex_all.view.targets
[0, 1, 2, 3]
even_lbview = rc.load_balanced_view(targets=rc.ids[::2]) ex_even = even_lbview.executor for pid in ex_even.map(lambda x: os.getpid(), range(10)): print(pid)
12507 12508 12507 12508 12507 12508 12507 12508 12507 12508
Typically, though, one will want an Executor for a LoadBalancedView on all the engines.
This is what the top-level
Client.executor() method will return:
ex = rc.executor() ex.view
Let’s create a few compatible Executor instances
Let’s make a few Executors. Aside: dask.distributed is a great library. Any IPython Parallel cluster can be bootstrapped into a dask cluster.
There can be serialization differences, especially for interactively defined functions (i.e. those in defined in a notebook itself). That’s why we define our task function in a local module, rather than here. ProcessPoolExecutor doesn’t serialize interactively defined functions. But for the most part working with functions defined in modules works consistently across implementations.
%pycat task_mod.py from task_mod import task
import numpy from numpy.linalg import norm def task(n): """Generates a 1xN array and computes its 2-norm""" A = numpy.ones(n) return norm(A, 2)
def task(n): """Generates a 1xN array and computes its 2-norm""" import numpy from numpy.linalg import norm A = numpy.ones(n) return norm(A, 2)
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor import distributed distributed_client = rc.become_dask() dist_ex = distributed_client.get_executor() N = 4 ip_ex = rc.executor(targets=range(N)) thread_ex = ThreadPoolExecutor(N) process_ex = ProcessPoolExecutor(N)
executors = [process_ex, thread_ex, ip_ex, dist_ex]
We can submit the same work with the same API, using four different mechanisms for distributing work. The results will be the same:
for executor in executors: print(executor.__class__.__name__) it = executor.map(str, range(5)) print(list(it))
This makes it easy to compare the different implementations. We are going to submit some dummy work—allocate and compute 2-norms of arrays of various sizes.
sizes = np.logspace(20, 24, 16, base=2, dtype=int) sizes
array([ 1048576, 1261463, 1517571, 1825676, 2196334, 2642245, 3178688, 3824041, 4600417, 5534417, 6658042, 8009791, 9635980, 11592325, 13945857, 16777216])
Run the work locally, to get a reference:
print("Local time:") %time ref = list(map(task, sizes))
Local time: CPU times: user 765 ms, sys: 403 ms, total: 1.17 s Wall time: 874 ms
And then run again with the various Executors:
for executor in executors: print(executor.__class__.__name__) result = executor.map(task, sizes) rlist = list(result) assert rlist == ref, "%s != %s" % (rlist, ref) # time the task assignment %timeit list(executor.map(task, sizes))
ProcessPoolExecutor 246 ms ± 86 ms per loop (mean ± std. dev. of 7 runs, 1 loop each) ThreadPoolExecutor 182 ms ± 32.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each) ViewExecutor 228 ms ± 24.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each) ClientExecutor 246 ms ± 27.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
For this toy work, the stdlib ThreadPoolExecutor appears to perform the best. That’s useful info, and likely to be true for most workloads that release the GIL and fit comfortably into memory. When the GIL is involved, ProcessPoolExecutor is often best for simple workloads.
One benefit of IPython Parallel or Distributed Executors over the stdlib Executors is that they do not have to be confined to a single machine. This means the standard Executor API lets you develop small-scale parallel tools that run locally in threads or processes, and then extend the exact same code to make use of multiple machines, just by selecting a different Executor.